Skip to content

Commit 28ea445

Browse files
zhengruifengsrowen
authored andcommitted
[SPARK-28159][ML] Make the transform natively in ml framework to avoid extra conversion
## What changes were proposed in this pull request? Make the transform natively in ml framework to avoid extra conversion. There are many TODOs in current ml module, like `// TODO: Make the transformer natively in ml framework to avoid extra conversion.` in ChiSqSelector. This PR is to make ml algs no longer need to convert ml-vector to mllib-vector in transforms. Including: LDA/ChiSqSelector/ElementwiseProduct/HashingTF/IDF/Normalizer/PCA/StandardScaler. ## How was this patch tested? existing testsuites Closes apache#24963 from zhengruifeng/to_ml_vector. Authored-by: zhengruifeng <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent e11a558 commit 28ea445

16 files changed

+348
-195
lines changed

mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala

+45-8
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.ml.clustering
1919

2020
import java.util.Locale
2121

22+
import breeze.linalg.normalize
23+
import breeze.numerics.exp
2224
import org.apache.hadoop.fs.Path
2325
import org.json4s.DefaultFormats
2426
import org.json4s.JsonAST.JObject
@@ -27,23 +29,23 @@ import org.json4s.jackson.JsonMethods._
2729
import org.apache.spark.annotation.{DeveloperApi, Since}
2830
import org.apache.spark.internal.Logging
2931
import org.apache.spark.ml.{Estimator, Model}
30-
import org.apache.spark.ml.linalg.{Matrix, Vector, Vectors, VectorUDT}
32+
import org.apache.spark.ml.linalg._
3133
import org.apache.spark.ml.param._
3234
import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasMaxIter, HasSeed}
3335
import org.apache.spark.ml.util._
3436
import org.apache.spark.ml.util.DefaultParamsReader.Metadata
3537
import org.apache.spark.ml.util.Instrumentation.instrumented
3638
import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel,
3739
EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel,
38-
LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel,
40+
LDAOptimizer => OldLDAOptimizer, LDAUtils => OldLDAUtils, LocalLDAModel => OldLocalLDAModel,
3941
OnlineLDAOptimizer => OldOnlineLDAOptimizer}
4042
import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
4143
import org.apache.spark.mllib.linalg.MatrixImplicits._
4244
import org.apache.spark.mllib.linalg.VectorImplicits._
4345
import org.apache.spark.mllib.util.MLUtils
4446
import org.apache.spark.rdd.RDD
4547
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
46-
import org.apache.spark.sql.functions.{col, monotonically_increasing_id, udf}
48+
import org.apache.spark.sql.functions.{monotonically_increasing_id, udf}
4749
import org.apache.spark.sql.types.StructType
4850
import org.apache.spark.storage.StorageLevel
4951
import org.apache.spark.util.PeriodicCheckpointer
@@ -457,21 +459,56 @@ abstract class LDAModel private[ml] (
457459
*/
458460
@Since("2.0.0")
459461
override def transform(dataset: Dataset[_]): DataFrame = {
460-
if ($(topicDistributionCol).nonEmpty) {
462+
transformSchema(dataset.schema, logging = true)
461463

462-
// TODO: Make the transformer natively in ml framework to avoid extra conversion.
463-
val transformer = oldLocalModel.getTopicDistributionMethod
464+
if ($(topicDistributionCol).nonEmpty) {
465+
val func = getTopicDistributionMethod
466+
val transformer = udf(func)
464467

465-
val t = udf { (v: Vector) => transformer(OldVectors.fromML(v)).asML }
466468
dataset.withColumn($(topicDistributionCol),
467-
t(DatasetUtils.columnToVector(dataset, getFeaturesCol))).toDF()
469+
transformer(DatasetUtils.columnToVector(dataset, getFeaturesCol)))
468470
} else {
469471
logWarning("LDAModel.transform was called without any output columns. Set an output column" +
470472
" such as topicDistributionCol to produce results.")
471473
dataset.toDF()
472474
}
473475
}
474476

477+
/**
478+
* Get a method usable as a UDF for `topicDistributions()`
479+
*/
480+
private def getTopicDistributionMethod: Vector => Vector = {
481+
val expElogbeta = exp(OldLDAUtils.dirichletExpectation(topicsMatrix.asBreeze.toDenseMatrix.t).t)
482+
val oldModel = oldLocalModel
483+
val docConcentrationBrz = oldModel.docConcentration.asBreeze
484+
val gammaShape = oldModel.gammaShape
485+
val k = oldModel.k
486+
val gammaSeed = oldModel.seed
487+
488+
vector: Vector =>
489+
if (vector.numNonzeros == 0) {
490+
Vectors.zeros(k)
491+
} else {
492+
val (ids: List[Int], cts: Array[Double]) = vector match {
493+
case v: DenseVector => ((0 until v.size).toList, v.values)
494+
case v: SparseVector => (v.indices.toList, v.values)
495+
case other =>
496+
throw new UnsupportedOperationException(
497+
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
498+
}
499+
500+
val (gamma, _, _) = OldOnlineLDAOptimizer.variationalTopicInference(
501+
ids,
502+
cts,
503+
expElogbeta,
504+
docConcentrationBrz,
505+
gammaShape,
506+
k,
507+
gammaSeed)
508+
Vectors.dense(normalize(gamma, 1.0).toArray)
509+
}
510+
}
511+
475512
@Since("1.6.0")
476513
override def transformSchema(schema: StructType): StructType = {
477514
validateAndTransformSchema(schema)

mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala

-4
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,6 @@ package org.apache.spark.ml.feature
1919

2020
import java.{util => ju}
2121

22-
import org.json4s.JsonDSL._
23-
import org.json4s.JValue
24-
import org.json4s.jackson.JsonMethods._
25-
2622
import org.apache.spark.SparkException
2723
import org.apache.spark.annotation.Since
2824
import org.apache.spark.ml.Model

mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala

+19-8
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path
2222
import org.apache.spark.annotation.Since
2323
import org.apache.spark.ml._
2424
import org.apache.spark.ml.attribute.{AttributeGroup, _}
25-
import org.apache.spark.ml.linalg.{Vector, VectorUDT}
25+
import org.apache.spark.ml.linalg._
2626
import org.apache.spark.ml.param._
2727
import org.apache.spark.ml.param.shared._
2828
import org.apache.spark.ml.util._
@@ -264,14 +264,25 @@ final class ChiSqSelectorModel private[ml] (
264264

265265
@Since("2.0.0")
266266
override def transform(dataset: Dataset[_]): DataFrame = {
267-
val transformedSchema = transformSchema(dataset.schema, logging = true)
268-
val newField = transformedSchema.last
269-
270-
// TODO: Make the transformer natively in ml framework to avoid extra conversion.
271-
val transformer: Vector => Vector = v => chiSqSelector.transform(OldVectors.fromML(v)).asML
267+
val outputSchema = transformSchema(dataset.schema, logging = true)
268+
269+
val newSize = selectedFeatures.length
270+
val func = { vector: Vector =>
271+
vector match {
272+
case SparseVector(_, indices, values) =>
273+
val (newIndices, newValues) = chiSqSelector.compressSparse(indices, values)
274+
Vectors.sparse(newSize, newIndices, newValues)
275+
case DenseVector(values) =>
276+
Vectors.dense(chiSqSelector.compressDense(values))
277+
case other =>
278+
throw new UnsupportedOperationException(
279+
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
280+
}
281+
}
272282

273-
val selector = udf(transformer)
274-
dataset.withColumn($(outputCol), selector(col($(featuresCol))), newField.metadata)
283+
val transformer = udf(func)
284+
dataset.withColumn($(outputCol), transformer(col($(featuresCol))),
285+
outputSchema($(outputCol)).metadata)
275286
}
276287

277288
@Since("1.6.0")

mllib/src/main/scala/org/apache/spark/ml/feature/ElementwiseProduct.scala

+21-5
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ package org.apache.spark.ml.feature
1919

2020
import org.apache.spark.annotation.Since
2121
import org.apache.spark.ml.UnaryTransformer
22-
import org.apache.spark.ml.linalg.{Vector, VectorUDT}
22+
import org.apache.spark.ml.linalg._
2323
import org.apache.spark.ml.param.Param
2424
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable}
25-
import org.apache.spark.mllib.feature
26-
import org.apache.spark.mllib.linalg.VectorImplicits._
25+
import org.apache.spark.mllib.feature.{ElementwiseProduct => OldElementwiseProduct}
26+
import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
2727
import org.apache.spark.sql.types.DataType
2828

2929
/**
@@ -55,8 +55,24 @@ class ElementwiseProduct @Since("1.4.0") (@Since("1.4.0") override val uid: Stri
5555

5656
override protected def createTransformFunc: Vector => Vector = {
5757
require(params.contains(scalingVec), s"transformation requires a weight vector")
58-
val elemScaler = new feature.ElementwiseProduct($(scalingVec))
59-
v => elemScaler.transform(v)
58+
val elemScaler = new OldElementwiseProduct(OldVectors.fromML($(scalingVec)))
59+
val vectorSize = $(scalingVec).size
60+
61+
vector: Vector => {
62+
require(vector.size == vectorSize,
63+
s"vector sizes do not match: Expected $vectorSize but found ${vector.size}")
64+
vector match {
65+
case DenseVector(values) =>
66+
val newValues = elemScaler.transformDense(values)
67+
Vectors.dense(newValues)
68+
case SparseVector(size, indices, values) =>
69+
val (newIndices, newValues) = elemScaler.transformSparse(indices, values)
70+
Vectors.sparse(size, newIndices, newValues)
71+
case other =>
72+
throw new UnsupportedOperationException(
73+
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
74+
}
75+
}
6076
}
6177

6278
override protected def outputDataType: DataType = new VectorUDT()

mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala

+12-6
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ package org.apache.spark.ml.feature
2020
import org.apache.spark.annotation.Since
2121
import org.apache.spark.ml.Transformer
2222
import org.apache.spark.ml.attribute.AttributeGroup
23+
import org.apache.spark.ml.linalg.Vectors
2324
import org.apache.spark.ml.param._
2425
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
2526
import org.apache.spark.ml.util._
26-
import org.apache.spark.mllib.feature
27+
import org.apache.spark.mllib.feature.{HashingTF => OldHashingTF}
2728
import org.apache.spark.sql.{DataFrame, Dataset}
2829
import org.apache.spark.sql.functions.{col, udf}
2930
import org.apache.spark.sql.types.{ArrayType, StructType}
@@ -93,11 +94,16 @@ class HashingTF @Since("1.4.0") (@Since("1.4.0") override val uid: String)
9394
@Since("2.0.0")
9495
override def transform(dataset: Dataset[_]): DataFrame = {
9596
val outputSchema = transformSchema(dataset.schema)
96-
val hashingTF = new feature.HashingTF($(numFeatures)).setBinary($(binary))
97-
// TODO: Make the hashingTF.transform natively in ml framework to avoid extra conversion.
98-
val t = udf { terms: Seq[_] => hashingTF.transform(terms).asML }
99-
val metadata = outputSchema($(outputCol)).metadata
100-
dataset.select(col("*"), t(col($(inputCol))).as($(outputCol), metadata))
97+
98+
val hashingTF = new OldHashingTF($(numFeatures)).setBinary($(binary))
99+
val func = (terms: Seq[_]) => {
100+
val seq = hashingTF.transformImpl(terms)
101+
Vectors.sparse(hashingTF.numFeatures, seq)
102+
}
103+
104+
val transformer = udf(func)
105+
dataset.withColumn($(outputCol), transformer(col($(inputCol))),
106+
outputSchema($(outputCol)).metadata)
101107
}
102108

103109
@Since("1.4.0")

mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala

+19-4
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path
2121

2222
import org.apache.spark.annotation.Since
2323
import org.apache.spark.ml._
24-
import org.apache.spark.ml.linalg.{Vector, VectorUDT}
24+
import org.apache.spark.ml.linalg._
2525
import org.apache.spark.ml.param._
2626
import org.apache.spark.ml.param.shared._
2727
import org.apache.spark.ml.util._
@@ -132,9 +132,24 @@ class IDFModel private[ml] (
132132
@Since("2.0.0")
133133
override def transform(dataset: Dataset[_]): DataFrame = {
134134
transformSchema(dataset.schema, logging = true)
135-
// TODO: Make the idfModel.transform natively in ml framework to avoid extra conversion.
136-
val idf = udf { vec: Vector => idfModel.transform(OldVectors.fromML(vec)).asML }
137-
dataset.withColumn($(outputCol), idf(col($(inputCol))))
135+
136+
val func = { vector: Vector =>
137+
vector match {
138+
case SparseVector(size, indices, values) =>
139+
val (newIndices, newValues) = feature.IDFModel.transformSparse(idfModel.idf,
140+
indices, values)
141+
Vectors.sparse(size, newIndices, newValues)
142+
case DenseVector(values) =>
143+
val newValues = feature.IDFModel.transformDense(idfModel.idf, values)
144+
Vectors.dense(newValues)
145+
case other =>
146+
throw new UnsupportedOperationException(
147+
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
148+
}
149+
}
150+
151+
val transformer = udf(func)
152+
dataset.withColumn($(outputCol), transformer(col($(inputCol))))
138153
}
139154

140155
@Since("1.4.0")

mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala

+20-14
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,7 @@ import org.apache.spark.ml.param._
2626
import org.apache.spark.ml.param.shared._
2727
import org.apache.spark.ml.util._
2828
import org.apache.spark.mllib.feature
29-
import org.apache.spark.mllib.linalg.{DenseMatrix => OldDenseMatrix, DenseVector => OldDenseVector,
30-
Matrices => OldMatrices, Vector => OldVector, Vectors => OldVectors}
31-
import org.apache.spark.mllib.linalg.MatrixImplicits._
32-
import org.apache.spark.mllib.linalg.VectorImplicits._
33-
import org.apache.spark.rdd.RDD
29+
import org.apache.spark.mllib.linalg.{DenseMatrix => OldDenseMatrix, Vectors => OldVectors}
3430
import org.apache.spark.sql._
3531
import org.apache.spark.sql.functions._
3632
import org.apache.spark.sql.types.{StructField, StructType}
@@ -92,12 +88,13 @@ class PCA @Since("1.5.0") (
9288
@Since("2.0.0")
9389
override def fit(dataset: Dataset[_]): PCAModel = {
9490
transformSchema(dataset.schema, logging = true)
95-
val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map {
91+
val input = dataset.select($(inputCol)).rdd.map {
9692
case Row(v: Vector) => OldVectors.fromML(v)
9793
}
9894
val pca = new feature.PCA(k = $(k))
9995
val pcaModel = pca.fit(input)
100-
copyValues(new PCAModel(uid, pcaModel.pc, pcaModel.explainedVariance).setParent(this))
96+
copyValues(new PCAModel(uid, pcaModel.pc.asML, pcaModel.explainedVariance.asML)
97+
.setParent(this))
10198
}
10299

103100
@Since("1.5.0")
@@ -149,15 +146,24 @@ class PCAModel private[ml] (
149146
@Since("2.0.0")
150147
override def transform(dataset: Dataset[_]): DataFrame = {
151148
transformSchema(dataset.schema, logging = true)
152-
val pcaModel = new feature.PCAModel($(k),
153-
OldMatrices.fromML(pc).asInstanceOf[OldDenseMatrix],
154-
OldVectors.fromML(explainedVariance).asInstanceOf[OldDenseVector])
155149

156-
// TODO: Make the transformer natively in ml framework to avoid extra conversion.
157-
val transformer: Vector => Vector = v => pcaModel.transform(OldVectors.fromML(v)).asML
150+
val func = { vector: Vector =>
151+
vector match {
152+
case dv: DenseVector =>
153+
pc.transpose.multiply(dv)
154+
case SparseVector(size, indices, values) =>
155+
/* SparseVector -> single row SparseMatrix */
156+
val sm = Matrices.sparse(size, 1, Array(0, indices.length), indices, values).transpose
157+
val projection = sm.multiply(pc)
158+
Vectors.dense(projection.values)
159+
case _ =>
160+
throw new IllegalArgumentException("Unsupported vector format. Expected " +
161+
s"SparseVector or DenseVector. Instead got: ${vector.getClass}")
162+
}
163+
}
158164

159-
val pcaOp = udf(transformer)
160-
dataset.withColumn($(outputCol), pcaOp(col($(inputCol))))
165+
val transformer = udf(func)
166+
dataset.withColumn($(outputCol), transformer(col($(inputCol))))
161167
}
162168

163169
@Since("1.5.0")

mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala

+28-5
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path
2121

2222
import org.apache.spark.annotation.Since
2323
import org.apache.spark.ml._
24-
import org.apache.spark.ml.linalg.{Vector, VectorUDT}
24+
import org.apache.spark.ml.linalg._
2525
import org.apache.spark.ml.param._
2626
import org.apache.spark.ml.param.shared._
2727
import org.apache.spark.ml.util._
@@ -162,11 +162,34 @@ class StandardScalerModel private[ml] (
162162
transformSchema(dataset.schema, logging = true)
163163
val scaler = new feature.StandardScalerModel(std, mean, $(withStd), $(withMean))
164164

165-
// TODO: Make the transformer natively in ml framework to avoid extra conversion.
166-
val transformer: Vector => Vector = v => scaler.transform(OldVectors.fromML(v)).asML
165+
val func = if ($(withMean)) {
166+
vector: Vector =>
167+
val values = vector match {
168+
// specially handle DenseVector because its toArray does not clone already
169+
case d: DenseVector => d.values.clone()
170+
case v: Vector => v.toArray
171+
}
172+
val newValues = scaler.transfromWithMean(values)
173+
Vectors.dense(newValues)
174+
} else if ($(withStd)) {
175+
vector: Vector =>
176+
vector match {
177+
case DenseVector(values) =>
178+
val newValues = scaler.transformDenseWithStd(values)
179+
Vectors.dense(newValues)
180+
case SparseVector(size, indices, values) =>
181+
val (newIndices, newValues) = scaler.transformSparseWithStd(indices, values)
182+
Vectors.sparse(size, newIndices, newValues)
183+
case other =>
184+
throw new UnsupportedOperationException(
185+
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
186+
}
187+
} else {
188+
vector: Vector => vector
189+
}
167190

168-
val scale = udf(transformer)
169-
dataset.withColumn($(outputCol), scale(col($(inputCol))))
191+
val transformer = udf(func)
192+
dataset.withColumn($(outputCol), transformer(col($(inputCol))))
170193
}
171194

172195
@Since("1.4.0")

0 commit comments

Comments
 (0)