Skip to content

Commit

Permalink
#106: Changed implementation without converting to panda df
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Hoffmann <[email protected]>
  • Loading branch information
dh1542 committed Jan 28, 2025
1 parent e43cc4b commit 08faaf1
Showing 1 changed file with 28 additions and 78 deletions.
Original file line number Diff line number Diff line change
@@ -1,64 +1,17 @@
import numpy as np
from scipy.ndimage import gaussian_filter, gaussian_filter1d
from pyspark.sql import DataFrame as PySparkDataFrame
from enum import Enum
import pandas as pd
from pyspark.sql.types import FloatType
from scipy.ndimage import gaussian_filter1d
from pyspark.sql import DataFrame as PySparkDataFrame, Window
from pyspark.sql import functions as F

from rtdip_sdk.pipelines.data_quality.data_manipulation.interfaces import DataManipulationBaseInterface
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import (
Libraries,
SystemType,
)
from ..interfaces import DataManipulationBaseInterface


class GaussianSmoothing(DataManipulationBaseInterface):
"""
Applies Gaussian smoothing to specified columns of a PySpark DataFrame.
This transformation smooths data to reduce noise and create a more continuous series of values.
It supports both temporal smoothing (over time for each ID) and spatial smoothing (across IDs at each timestamp).
The smoothing is applied using SciPys `gaussian_filter1d` function.
Args:
df (pyspark.sql.DataFrame): The input PySpark DataFrame to process.
sigma (float): The standard deviation for the Gaussian kernel, controlling the amount of smoothing.
mode (str): Smoothing mode, either 'temporal' or 'spatial'.
id_col (str): Name of the column containing unique IDs for grouping (e.g., sensors, tags).
timestamp_col (str): Name of the column containing timestamps.
value_col (str): Name of the column containing the values to smooth.
Example:
```python
from pyspark.sql import SparkSession
from rtdip_sdk.pipelines.data_manipulation.spark.data_quality.gaussian_smoothing import GaussianSmoothing
spark = SparkSession.builder.master("local[1]").appName("GaussianSmoothingExample").getOrCreate()
# Example DataFrame
data = [
("Sensor1", "2024-01-02 03:49:45.000", 0.13),
("Sensor1", "2024-01-02 07:53:11.000", 0.12),
("Sensor1", "2024-01-02 11:56:42.000", 0.13),
("Sensor1", "2024-01-02 16:00:12.000", 0.15),
("Sensor1", "2024-01-02 20:03:46.000", 0.34),
]
columns = ["TagName", "EventTime", "Value"]
df = spark.createDataFrame(data, columns)
smoother = GaussianSmoothing(
df=df,
sigma=2.0,
mode="temporal", # Choose between "temporal" and "spatial"
id_col="TagName",
timestamp_col="EventTime",
value_col="Value"
)
result_df = smoother.filter()
result_df.show()
```
"""

def __init__(
self,
df: PySparkDataFrame,
Expand All @@ -68,15 +21,13 @@ def __init__(
timestamp_col: str = "timestamp",
value_col: str = "value",
) -> None:

if not isinstance(df, PySparkDataFrame):
raise TypeError("df must be a PySpark DataFrame")
if not isinstance(sigma, (int, float)) or sigma <= 0:
raise ValueError("sigma must be a positive number")
if mode not in ["temporal", "spatial"]:
raise ValueError("mode must be either 'temporal' or 'spatial'")

# Validate column existence
if id_col not in df.columns:
raise ValueError(f"Column {id_col} not found in DataFrame")
if timestamp_col not in df.columns:
Expand All @@ -93,10 +44,6 @@ def __init__(

@staticmethod
def system_type():
"""
Attributes:
SystemType (Environment): Requires PYSPARK
"""
return SystemType.PYSPARK

@staticmethod
Expand All @@ -110,30 +57,33 @@ def settings() -> dict:

def filter(self) -> PySparkDataFrame:

pdf = self.df.toPandas()

if self.mode == "temporal":
pdf[self.value_col] = pdf.groupby(self.id_col)[self.value_col].transform(
lambda x: gaussian_filter1d(x.astype(float).values, sigma=self.sigma)
)
else: # spatial
unique_timestamps = pdf[self.timestamp_col].unique()

for timestamp in unique_timestamps:
mask = pdf[self.timestamp_col] == timestamp
ids = pdf.loc[mask, self.id_col].values
values = pdf.loc[mask, self.value_col].astype(float).values
def create_gaussian_smoother(sigma_value):
def apply_gaussian(values):
if not values:
return None
values_array = np.array([float(v) for v in values])
smoothed = gaussian_filter1d(values_array, sigma=self.sigma)
return float(smoothed[-1])

sorted_indices = np.argsort(ids)
ids_sorted = ids[sorted_indices]
values_sorted = values[sorted_indices]
return apply_gaussian

smoothed_values = gaussian_filter1d(values_sorted, sigma=self.sigma)
smooth_udf = F.udf(create_gaussian_smoother(self.sigma), FloatType())

reverse_indices = np.argsort(sorted_indices)
pdf.loc[mask, self.value_col] = smoothed_values[reverse_indices]
if self.mode == "temporal":
window = (
Window.partitionBy(self.id_col)
.orderBy(self.timestamp_col)
.rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
)
else: # spatial mode
window = (
Window.partitionBy(self.timestamp_col)
.orderBy(self.id_col)
.rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
)

spark = self.df.sparkSession
result_df = spark.createDataFrame(pdf)
collect_list_expr = F.collect_list(F.col(self.value_col)).over(window)

return result_df
return self.df.withColumn(self.value_col, smooth_udf(collect_list_expr))

0 comments on commit 08faaf1

Please sign in to comment.