Skip to content

Commit

Permalink
update docs and examples
Browse files Browse the repository at this point in the history
  • Loading branch information
ozancicek committed Sep 13, 2020
1 parent e93acae commit 0b713f2
Show file tree
Hide file tree
Showing 19 changed files with 59 additions and 37 deletions.
5 changes: 2 additions & 3 deletions docs/ekfguide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ for defining the features matrix, and the nonlinear update is done with the defi
val a = 0.2
val b = 0.7
val noiseParam = 1.0
val stateSize = 2
val measurementSize = 1
// UDF's for generating measurement vector ([y]) and measurement model matrix ([[x ,1]])
val measurementUDF = udf((x: Double, r: Double) => {
Expand Down Expand Up @@ -90,8 +88,9 @@ for defining the features matrix, and the nonlinear update is done with the defi
new DenseMatrix(1, 2, jacs)
}
val filter = new ExtendedKalmanFilter(stateSize, measurementSize)
val filter = new ExtendedKalmanFilter()
.setStateKeyCol("stateKey")
.setInitialStateMean(new DenseVector(Array(0.0, 0.0)))
.setInitialStateCovariance(
new DenseMatrix(2, 2, Array(10.0, 0.0, 0.0, 10.0)))
.setMeasurementCol("measurement")
Expand Down
7 changes: 5 additions & 2 deletions docs/gmmguide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ consecutive numbers with timestamps. These consecutive numbers are binned to sim
.withColumn("sample", mixture)
// Set initial values and hyperparams.
val gmm = new MultivariateGaussianMixture(3)
val gmm = new MultivariateGaussianMixture()
.setInitialWeights(Array(0.33, 0.33, 0.33))
.setStateKeyCol("stateKey")
.setInitialMeans(Array(Array(3.0, 5.0), Array(6.0, 6.0), Array(7.0, 1.0)))
.setInitialCovariances(Array(Array(1.0, 0.0, 0.0, 1.0), Array(1.0, 0.0, 0.0, 1.0), Array(1.0, 0.0, 0.0, 1.0)))
Expand Down Expand Up @@ -204,7 +205,9 @@ consecutive numbers with timestamps. These consecutive numbers are binned to sim
eye = [1.0, 0.0, 0.0, 1.0]
gmm = MultivariateGaussianMixture(3)\
gmm = MultivariateGaussianMixture()\
.setMixtureCount(3)\
.setInitialWeights([0.0, 0.0, 0.0])\
.setStateKeyCol("stateKey")\
.setInitialMeans([[3.0, 5.0], [6.0, 6.0], [7.0, 1.0]])\
.setInitialCovariances([eye, eye, eye])\
Expand Down
6 changes: 4 additions & 2 deletions docs/lkfguide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ can be same for all measurements/filters, so their values are set directly with

.. code-block:: scala
val filter = new LinearKalmanFilter(stateSize, measurementSize)
val filter = new LinearKalmanFilter()
.setInitialStateMean(new DenseVector(Array(0.0, 0.0, 0.0)))
.setInitialStateCovariance(
new DenseMatrix(3, 3, Array(10.0, 0.0, 0.0, 0.0, 10.0, 0.0, 0.0, 0.0, 10.0)))
.setStateKeyCol("stateKey")
Expand Down Expand Up @@ -213,10 +214,11 @@ can be same for all measurements/filters, so their values are set directly with

.. code-block:: python
lkf = LinearKalmanFilter(state_size, measurement_size)\
lkf = LinearKalmanFilter()\
.setStateKeyCol("stateKey")\
.setMeasurementCol("label")\
.setMeasurementModelCol("features")\
.setInitialStateMean(Vectors.dense([0.0, 0.0, 0.0]))\
.setInitialCovariance(Matrices.dense(3, 3, [10, 0, 0, 0, 10, 0, 0, 0, 10]))\
.setProcessModel(Matrices.dense(3, 3, [1, 0, 0, 0, 1, 0, 0, 0, 1]))\
.setProcessNoise(Matrices.dense(3, 3, [0] * 9))\
Expand Down
3 changes: 2 additions & 1 deletion docs/mmaeguide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ the same window. This window can be set from ``setMultipleModelMeasurementWindow

.. code-block:: scala
val filter = new LinearKalmanFilter(stateSize, measurementsSize)
val filter = new LinearKalmanFilter()
.setInitialStateMean(new DenseVector(Array(0.0, 0.0, 0.0)))
.setInitialStateCovariance(
new DenseMatrix(3, 3, Array(10.0, 0.0, 0.0, 0.0, 10.0, 0.0, 0.0, 0.0, 10.0)))
.setStateKeyCol("stateKey")
Expand Down
12 changes: 9 additions & 3 deletions docs/rlsguide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,11 @@ identifying different models and their incremented index.
val truncate = udf((state: DenseVector) => state.values.map(t => (math floor t * 100)/100))
val filter = new RecursiveLeastSquaresFilter(featuresSize)
val filter = new RecursiveLeastSquaresFilter()
.setStateKeyCol("stateKey")
.setInitialEstimate(new DenseVector(Array(0.0, 0.0, 0.0)))
.setRegularizationMatrixFactor(10E6)
.setForgettingFactor(0.99)
val query = filter.transform(features)
.select($"stateKey", $"stateIndex", truncate($"state.mean").alias("modelParameters"))
Expand Down Expand Up @@ -180,8 +183,11 @@ identifying different models and their incremented index.

.. code-block:: python
rls = RecursiveLeastSquaresFilter(features_size)\
.setStateKeyCol("stateKey")
rls = RecursiveLeastSquaresFilter()\
.setStateKeyCol("stateKey")\
.setInitialEstimate(Vectors.dense([0.0, 0.0, 0.0]))\
.setRegularizationMatrixFactor(10E6)\
.setForgettingFactor(0.99)
query = rls.transform(measurements)\
.writeStream\
Expand Down
13 changes: 8 additions & 5 deletions docs/stateguide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ typically use the state with largest stateIndex as a snapshot of the state.
// Batch dataframe of measurements
val batchMeasurements: DataFrame = ...
val batchFilter = new LinearKalmanFilter(2, 1)
val batchFilter = new LinearKalmanFilter()
.setStateKeyCol("stateKey")
.setMeasurementCol("measurement")
Expand Down Expand Up @@ -123,9 +123,12 @@ can be set from the input dataframe. This will cause measurements to be processe
// Filter for estimating local linear increasing trend
val filter = new LinearKalmanFilter(2, 1)
val filter = new LinearKalmanFilter()
.setMeasurementCol("measurement")
.setEventTimeCol("eventTime")
.setInitialStateMean(new DenseVector(Array(0.0, 0.0)))
.setInitialStateCovariance(
DenseMatrix.eye(2))
.setProcessModel(
new DenseMatrix(2, 2, Array(1, 0, 1, 1)))
.setProcessNoise(
Expand Down Expand Up @@ -202,7 +205,7 @@ column to help propagating watermarks.

.. code-block:: scala
val filter = new LinearKalmanFilter(2, 1)
val filter = new LinearKalmanFilter()
.setMeasurementCol("measurement")
.setEventTimeCol("eventTime")
.setWatermarkDuration("10 seconds")
Expand Down Expand Up @@ -232,7 +235,7 @@ the state will be initialized as if it received no measurements. Supported value
// Event time based state timeout. States receiving no measurements for 12 hours will be cleared.
// Timeout duration is measured with event time, so event time column must be set.
val filter = new LinearKalmanFilter(2, 1)
val filter = new LinearKalmanFilter()
.setStateKeyCol("modelId")
.setMeasurementCol("measurement")
.setEventTimeCol("eventTime")
Expand All @@ -241,7 +244,7 @@ the state will be initialized as if it received no measurements. Supported value
// Process time based state timeout. States receiving no measurements for 12 hours will be cleared.
// Timeout duration is measured with processing time. Therefore, it's not necessary to set event time column
val filter = new LinearKalmanFilter(2, 1)
val filter = new LinearKalmanFilter()
.setStateKeyCol("modelId")
.setMeasurementCol("measurement")
.setStateTimeoutDuration("12 hours")
Expand Down
5 changes: 2 additions & 3 deletions docs/ukfguide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ should be ``(Vector, Matrix) => Vector``.
val a = 0.2
val b = 0.7
val noiseParam = 1.0
val stateSize = 2
val measurementSize = 1
// UDF's for generating measurement vector ([y]) and measurement model matrix ([[x ,1]])
val measurementUDF = udf((x: Double, r: Double) => {
Expand All @@ -51,8 +49,9 @@ should be ``(Vector, Matrix) => Vector``.
measurement
}
val filter = new UnscentedKalmanFilter(stateSize, measurementSize)
val filter = new UnscentedKalmanFilter()
.setStateKeyCol("stateKey")
.setInitialStateMean(new DenseVector(Array(0.0, 0.0)))
.setInitialStateCovariance(
DenseMatrix.eye(2))
.setMeasurementCol("measurement")
Expand Down
4 changes: 3 additions & 1 deletion examples/src/main/python/streaming/gmm_rate_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@
initial_means = [[3.0, 5.0], [6.0, 6.0], [7.0, 1.0]]
eye = [1.0, 0.0, 0.0, 1.0]
initial_covs = [eye, eye, eye]
gmm = MultivariateGaussianMixture(3)\
gmm = MultivariateGaussianMixture()\
.setMixtureCount(3)\
.setInitialWeights([0.0, 0.0, 0.0])\
.setStateKeyCol("stateKey")\
.setInitialMeans(initial_means)\
.setInitialCovariances(initial_covs)\
Expand Down
3 changes: 2 additions & 1 deletion examples/src/main/python/streaming/lkf_rate_source_llt.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@
.withColumn("stateKey", F.col("mod").cast("String"))\
.withColumn("trend", (F.col("value")/num_states).cast("Integer") + F.randn() * noise_param)

lkf = LinearKalmanFilter(2, 1)\
lkf = LinearKalmanFilter()\
.setStateKeyCol("stateKey")\
.setMeasurementCol("measurement")\
.setInitialStateMean(Vectors.dense([0.0, 0.0]))\
.setInitialStateCovariance(Matrices.dense(2, 2, [10000.0, 0.0, 0.0, 10000.0]))\
.setProcessModel(Matrices.dense(2, 2, [1.0, 0.0, 1.0, 1.0]))\
.setProcessNoise(Matrices.dense(2, 2, [0.0001, 0.0, 0.0, 0.0001]))\
Expand Down
7 changes: 3 additions & 4 deletions examples/src/main/python/streaming/lkf_rate_source_ols.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@
b = 0.2
c = 1.2
noise_param = 1
state_size = 3
measurement_size = 1

label_udf = F.udf(lambda x, y, w: Vectors.dense([x * a + y * b + c + w]), VectorUDT())
features_udf = F.udf(lambda x, y: Matrices.dense(1, 3, [x, y, 1]), MatrixUDT())
Expand All @@ -101,10 +99,11 @@
.withColumn("label", label_udf("x", "y", "w"))\
.withColumn("features", features_udf("x", "y"))

lkf = LinearKalmanFilter(state_size, measurement_size)\
lkf = LinearKalmanFilter()\
.setStateKeyCol("stateKey")\
.setMeasurementCol("label")\
.setMeasurementModelCol("features")\
.setMeasurementModelCol("features") \
.setInitialStateMean(Vectors.dense([0.0, 0.0, 0.0]))\
.setInitialStateCovariance(Matrices.dense(3, 3, [10, 0, 0, 0, 10, 0, 0, 0, 10]))\
.setProcessModel(Matrices.dense(3, 3, [1, 0, 0, 0, 1, 0, 0, 0, 1]))\
.setProcessNoise(Matrices.dense(3, 3, [0] * 9))\
Expand Down
5 changes: 3 additions & 2 deletions examples/src/main/python/streaming/rls_rate_source_ols.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml.feature import VectorAssembler

from pyspark.ml.linalg import Vectors

if __name__ == "__main__":
"""
Expand Down Expand Up @@ -87,8 +87,9 @@
.withColumn("w", F.randn(0) * noise_param)\
.withColumn("label", label_expression)

rls = RecursiveLeastSquaresFilter(3)\
rls = RecursiveLeastSquaresFilter()\
.setStateKeyCol("stateKey")\
.setInitialEstimate(Vectors.dense([0.0, 0.0, 0.0]))\
.setRegularizationMatrixFactor(10E6)\
.setForgettingFactor(0.99)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ object EKFRateSourceGLMLog {
val a = 0.2
val b = 0.7
val noiseParam = 1.0
val stateSize = 2
val measurementSize = 1

// UDF's for generating measurement vector ([y]) and measurement model matrix ([[x ,1]])
val measurementUDF = udf((x: Double, r: Double) => {
Expand Down Expand Up @@ -83,8 +81,9 @@ object EKFRateSourceGLMLog {
new DenseMatrix(1, 2, jacs)
}

val filter = new ExtendedKalmanFilter(stateSize, measurementSize)
val filter = new ExtendedKalmanFilter()
.setStateKeyCol("modelID")
.setInitialStateMean(new DenseVector(Array(0.0, 0.0)))
.setInitialStateCovariance(
new DenseMatrix(2, 2, Array(10.0, 0.0, 0.0, 10.0)))
.setMeasurementCol("measurement")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ object GMMRateSource {
val weight = rand(seed=0)
val mixture = when(weight < 0.2, dist1).when(weight < 0.5, dist2).otherwise(dist3)

val gmm = new MultivariateGaussianMixture(3)
val gmm = new MultivariateGaussianMixture()
.setInitialWeights(Array(0.33, 0.33, 0.33))
.setStateKeyCol("stateKey")
.setInitialMeans(Array(Array(3.0, 5.0), Array(6.0, 6.0), Array(7.0, 1.0)))
.setInitialCovariances(Array(Array(1.0, 0.0, 0.0, 1.0), Array(1.0, 0.0, 0.0, 1.0), Array(1.0, 0.0, 0.0, 1.0)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ object LKFRateSourceLLT {

val measurementUdf = udf((t: Long, r: Double) => new DenseVector(Array(t.toDouble + r)))

val filter = new LinearKalmanFilter(2, 1)
val filter = new LinearKalmanFilter()
.setStateKeyCol("stateKey")
.setMeasurementCol("measurement")
.setInitialStateMean(new DenseVector(Array(0.0, 0.0)))
.setInitialStateCovariance(
new DenseMatrix(2, 2, Array(1000, 0, 0, 1000)))
.setProcessModel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ object LKFRateSourceOLS {
.withColumn("label", labelUDF($"x", $"y", randn() * noiseParam))
.withColumn("features", featuresUDF($"x", $"y"))

val filter = new LinearKalmanFilter(stateSize, measurementsSize)
val filter = new LinearKalmanFilter()
.setInitialStateMean(new DenseVector(Array(0.0, 0.0, 0.0)))
.setInitialStateCovariance(
new DenseMatrix(3, 3, Array(10.0, 0.0, 0.0, 0.0, 10.0, 0.0, 0.0, 0.0, 10.0)))
.setStateKeyCol("stateKey")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ object LKFStreamBatchInit {

val measurementUdf = udf((t: Long, r: Double) => new DenseVector(Array(t.toDouble + r)))

val batchFilter = new LinearKalmanFilter(2, 1)
val batchFilter = new LinearKalmanFilter()
.setStateKeyCol("stateKey")
.setMeasurementCol("measurement")
.setInitialStateMean(new DenseVector(Array(0.0, 0.0)))
.setInitialStateCovariance(
new DenseMatrix(2, 2, Array(1000, 0, 0, 1000)))
.setProcessModel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ object MMAERateSourceOLS {
.withColumn("label", labelUDF($"x", $"y", randn() * noiseParam))
.withColumn("features", featuresUDF($"x", $"y"))

val filter = new LinearKalmanFilter(stateSize, measurementsSize)
val filter = new LinearKalmanFilter()
.setInitialStateMean(new DenseVector(Array(0.0, 0.0, 0.0)))
.setInitialStateCovariance(
new DenseMatrix(3, 3, Array(10.0, 0.0, 0.0, 0.0, 10.0, 0.0, 0.0, 0.0, 10.0)))
.setStateKeyCol("stateKey")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ object RLSRateSourceOLS {
a*x + b*y + c + r
})

val filter = new RecursiveLeastSquaresFilter(3)
val filter = new RecursiveLeastSquaresFilter()
.setStateKeyCol("stateKey")
.setInitialEstimate(new DenseVector(Array(0.0, 0.0, 0.0)))
.setRegularizationMatrixFactor(10E6)
.setForgettingFactor(0.99)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ object UKFRateSourceGLMLog {
measurement
}

val filter = new UnscentedKalmanFilter(stateSize, measurementSize)
val filter = new UnscentedKalmanFilter()
.setStateKeyCol("stateKey")
.setInitialStateMean(new DenseVector(Array(0.0, 0.0)))
.setInitialStateCovariance(
DenseMatrix.eye(2))
.setMeasurementCol("measurement")
Expand Down

0 comments on commit 0b713f2

Please sign in to comment.