Skip to content

Commit

Permalink
Mirico Metadata Transformer and Unit Tests (rtdip#758)
Browse files Browse the repository at this point in the history
* mirico metadata transformer

Signed-off-by: Chloe Ching <[email protected]>

* unit tests and update sql query builder to remove warning

Signed-off-by: Chloe Ching <[email protected]>

* update docs

Signed-off-by: Chloe Ching <[email protected]>

* update mirico transformer docs

Signed-off-by: Chloe Ching <[email protected]>

* update docs

Signed-off-by: Chloe Ching <[email protected]>

---------

Signed-off-by: Chloe Ching <[email protected]>
  • Loading branch information
cching95 authored May 31, 2024
1 parent 380bf64 commit 38af92d
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 6 deletions.
14 changes: 14 additions & 0 deletions src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,3 +640,17 @@ def get_dbutils(
]
)
)

MIRICO_METADATA_SCHEMA = StructType(
[
StructField("retroName", StringType(), True),
StructField("siteName", StringType(), True),
StructField("retroAltitude", FloatType(), True),
StructField("sensorAltitude", FloatType(), True),
StructField("retroLongitude", FloatType(), True),
StructField("gasType", StringType(), True),
StructField("sensorLatitude", FloatType(), True),
StructField("retroLatitude", FloatType(), True),
StructField("sensorLongitude", FloatType(), True),
]
)
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@
from .spark.honeywell_apm_to_pcdm import *
from .spark.sem_json_to_pcdm import *
from .spark.mirico_json_to_pcdm import *
from .spark.mirico_json_to_metadata import *
from .spark.pandas_to_pyspark import *
from .spark.pyspark_to_pandas import *
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class AIOJsonToPCDMTransformer(TransformerInterface):
Parameters:
data (DataFrame): Dataframe containing the column with Json AIO data
source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data
source_column_name (str): Spark Dataframe column containing the Json AIO data
status_null_value (str): If populated, will replace 'Good' in the Status column with the specified value.
change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value.
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pyspark.sql import DataFrame
import logging
from pyspark.sql.functions import (
from_json,
col,
lit,
concat_ws,
upper,
expr,
)
from ...._sdk_utils.compare_versions import (
_package_version_meets_minimum,
)
from ..interfaces import TransformerInterface
from ..._pipeline_utils.models import Libraries, SystemType
from ..._pipeline_utils.spark import MIRICO_METADATA_SCHEMA


class MiricoJsonToMetadataTransformer(TransformerInterface):
"""
Converts a Spark Dataframe column containing a json string created from Mirico to the Metadata Model.
Example
--------
```python
from rtdip_sdk.pipelines.transformers import MiricoJsonToMetadataTransformer
mirico_json_to_metadata_transformer = MiricoJsonToMetadataTransformer(
data=df
source_column_name="body"
)
result = mirico_json_to_metadata_transformer.transform()
```
Parameters:
data (DataFrame): Dataframe containing the column with Mirico data
source_column_name (str): Spark Dataframe column containing the Json Mirico data
"""

data: DataFrame
source_column_name: str

def __init__(self, data: DataFrame, source_column_name: str) -> None:
_package_version_meets_minimum("pyspark", "3.4.0")
self.data = data
self.source_column_name = source_column_name

@staticmethod
def system_type():
"""
Attributes:
SystemType (Environment): Requires PYSPARK
"""
return SystemType.PYSPARK

@staticmethod
def libraries():
libraries = Libraries()
return libraries

@staticmethod
def settings() -> dict:
return {}

def pre_transform_validation(self):
return True

def post_transform_validation(self):
return True

def transform(self) -> DataFrame:
"""
Returns:
DataFrame: A dataframe with the specified column converted to Metadata model
"""

df = self.data.select(
from_json(self.source_column_name, MIRICO_METADATA_SCHEMA).alias("body"),
)

tag_name_expr = concat_ws(
"_",
*[
upper(col("body.siteName")),
upper(col("body.retroName")),
upper(col("body.gasType")),
]
)

df = df.select(
tag_name_expr.alias("TagName"),
lit("").alias("Description"),
lit("").alias("UoM"),
expr(
"""struct(
body.retroAltitude,
body.retroLongitude,
body.retroLatitude,
body.sensorAltitude,
body.sensorLongitude,
body.sensorLatitude)"""
).alias("Properties"),
).dropDuplicates(["TagName"])

return df.select("TagName", "Description", "UoM", "Properties")
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ class MiricoJsonToPCDMTransformer(TransformerInterface):
```
Parameters:
data (DataFrame): Dataframe containing the column with SEM data
source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data
data (DataFrame): Dataframe containing the column with Mirico data
source_column_name (str): Spark Dataframe column containing the Json Mirico data
status_null_value (optional str): If populated, will replace 'Good' in the Status column with the specified value.
change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value.
tagname_field (optional str): If populated, will add the specified field to the TagName column.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

class OPCUAJsonToPCDMTransformer(TransformerInterface):
"""
Converts a Spark Dataframe column containing a json string created by Open Source OPCUA to the Process Control Data Model.
Converts a Spark Dataframe column containing a json string created by Open Source OPC UA to the Process Control Data Model.
Example
--------
Expand All @@ -40,7 +40,7 @@ class OPCUAJsonToPCDMTransformer(TransformerInterface):
```
Parameters:
data (DataFrame): Dataframe containing the column with Json OPCUA data
data (DataFrame): Dataframe containing the column with Json OPC UA data
source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data
status_null_value (str): If populated, will replace 'Good' in the Status column with the specified value.
change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class SEMJsonToPCDMTransformer(TransformerInterface):
Parameters:
data (DataFrame): Dataframe containing the column with SEM data
source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data
source_column_name (str): Spark Dataframe column containing the Json SEM data
version (int): The version for the OBC field mappings. The latest version is 10.
status_null_value (optional str): If populated, will replace 'Good' in the Status column with the specified value.
change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value.
Expand Down
1 change: 1 addition & 0 deletions src/sdk/python/rtdip_sdk/queries/sql/sql_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def get(
"""
try:
parameters_dict = {"sql_statement": sql_query}
parameters_dict["supress_warning"] = True
if limit:
parameters_dict["limit"] = limit
if offset:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys

sys.path.insert(0, ".")
from src.sdk.python.rtdip_sdk.pipelines.transformers.spark.mirico_json_to_metadata import (
MiricoJsonToMetadataTransformer,
)
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import (
Libraries,
SystemType,
)

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType, FloatType
import pytest
from src.sdk.python.rtdip_sdk._sdk_utils.compare_versions import (
_package_version_meets_minimum,
)


def test_mirico_json_to_metadata(spark_session: SparkSession):
mirico_json_data = '{"gasType": "test", "retroLongitude": 123.45, "retroLatitude": 123.45 , "sensorAltitude": 123.45, "sensorLongitude": 123.45, "sensorLatitude": 123.45, "retroName": "test", "siteName": "test", "retroAltitude": 123.45}'
mirico_df: DataFrame = spark_session.createDataFrame([{"body": mirico_json_data}])

expected_schema = StructType(
[
StructField("TagName", StringType(), False),
StructField("Description", StringType(), False),
StructField("UoM", StringType(), False),
StructField(
"Properties",
StructType(
[
StructField("retroAltitude", FloatType(), True),
StructField("retroLongitude", FloatType(), True),
StructField("retroLatitude", FloatType(), True),
StructField("sensorAltitude", FloatType(), True),
StructField("sensorLongitude", FloatType(), True),
StructField("sensorLatitude", FloatType(), True),
]
),
False,
),
]
)

expected_data = [
{
"TagName": "TEST_TEST_TEST",
"Description": "",
"UoM": "",
"Properties": {
"retroAltitude": 123.45,
"retroLongitude": 123.45,
"retroLatitude": 123.45,
"sensorAltitude": 123.45,
"sensorLongitude": 123.45,
"sensorLatitude": 123.45,
},
}
]

expected_df: DataFrame = spark_session.createDataFrame(
schema=expected_schema, data=expected_data
)

try:
if _package_version_meets_minimum("pyspark", "3.4.0"):
mirico_json_to_metadata_transformer = MiricoJsonToMetadataTransformer(
data=mirico_df, source_column_name="body"
)
actual_df = mirico_json_to_metadata_transformer.transform()

assert (
mirico_json_to_metadata_transformer.system_type() == SystemType.PYSPARK
)
assert isinstance(
mirico_json_to_metadata_transformer.libraries(), Libraries
)

assert expected_schema == actual_df.schema
assert expected_df.collect() == actual_df.collect()
except:
with pytest.raises(Exception):
mirico_json_to_metadata_transformer = MiricoJsonToMetadataTransformer(
data=mirico_df, source_column_name="body"
)

0 comments on commit 38af92d

Please sign in to comment.