From 5a796a00414594d9d29a20803fcdbce4c4eef640 Mon Sep 17 00:00:00 2001 From: cching95 <73163191+cching95@users.noreply.github.com> Date: Mon, 20 May 2024 16:12:18 +0100 Subject: [PATCH] AIO Transformer and Unit Test (#745) * add aio transformer Signed-off-by: Chloe Ching * aio unit test Signed-off-by: Chloe Ching * unit test Signed-off-by: Chloe Ching * add mend Signed-off-by: Chloe Ching * unit tests for aio transformer Signed-off-by: Chloe Ching * add aio transformer to init Signed-off-by: Chloe Ching * add line under spark.py Signed-off-by: Chloe Ching * update eventime in unit tests Signed-off-by: Chloe Ching --------- Signed-off-by: Chloe Ching --- .whitesource | 13 +- .../pipelines/_pipeline_utils/spark.py | 10 ++ .../pipelines/transformers/__init__.py | 1 + .../transformers/spark/aio_json_to_pcdm.py | 114 ++++++++++++++++++ .../spark/test_aio_opcua_json_to_pcdm.py | 79 ++++++++++++ 5 files changed, 205 insertions(+), 12 deletions(-) create mode 100644 src/sdk/python/rtdip_sdk/pipelines/transformers/spark/aio_json_to_pcdm.py create mode 100644 tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_aio_opcua_json_to_pcdm.py diff --git a/.whitesource b/.whitesource index 9c7ae90b4..a76605899 100644 --- a/.whitesource +++ b/.whitesource @@ -1,14 +1,3 @@ { - "scanSettings": { - "baseBranches": [] - }, - "checkRunSettings": { - "vulnerableCheckRunConclusionLevel": "failure", - "displayMode": "diff", - "useMendCheckNames": true - }, - "issueSettings": { - "minSeverityLevel": "LOW", - "issueType": "DEPENDENCY" - } + "settingsInheritedFrom": "sede-x/whitesource-config@main" } \ No newline at end of file diff --git a/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py index 5bd278e4b..1107deff4 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py +++ b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py @@ -610,3 +610,13 @@ def get_dbutils( StructField("sourceName", StringType(), True), ] ) + +AIO_SCHEMA = MapType( + StringType(), + StructType( + [ + StructField("SourceTimestamp", TimestampType(), True), + StructField("Value", StringType(), True), + ] + ), +) diff --git a/src/sdk/python/rtdip_sdk/pipelines/transformers/__init__.py b/src/sdk/python/rtdip_sdk/pipelines/transformers/__init__.py index ecb4062f6..542eeedea 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/transformers/__init__.py +++ b/src/sdk/python/rtdip_sdk/pipelines/transformers/__init__.py @@ -18,6 +18,7 @@ from .spark.fledge_opcua_json_to_pcdm import * from .spark.ssip_pi_binary_file_to_pcdm import * from .spark.ssip_pi_binary_json_to_pcdm import * +from .spark.aio_json_to_pcdm import * from .spark.iso import * from .spark.edgex_opcua_json_to_pcdm import * from .spark.ecmwf.nc_extractbase_to_weather_data_model import * diff --git a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/aio_json_to_pcdm.py b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/aio_json_to_pcdm.py new file mode 100644 index 000000000..3ad52c3a7 --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/aio_json_to_pcdm.py @@ -0,0 +1,114 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pyspark.sql import DataFrame +from pyspark.sql.functions import from_json, col, explode, when, lit, expr + +from ..interfaces import TransformerInterface +from ..._pipeline_utils.models import Libraries, SystemType +from ..._pipeline_utils.spark import AIO_SCHEMA + + +class AIOJsonToPCDMTransformer(TransformerInterface): + """ + Converts a Spark Dataframe column containing a json string created by Fledge to the Process Control Data Model. + + Example + -------- + ```python + from rtdip_sdk.pipelines.transformers import AIOJsonToPCDMTransformer + + fledge_opcua_json_to_pcdm_transfromer = AIOJsonToPCDMTransformer( + data=df, + souce_column_name="body", + status_null_value="Good", + change_type_value="insert" + ) + + result = aio_json_to_pcdm_transfromer.transform() + ``` + + Parameters: + data (DataFrame): Dataframe containing the column with Json Fledge data + source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data + status_null_value (str): If populated, will replace 'Good' in the Status column with the specified value. + change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value. + """ + + data: DataFrame + source_column_name: str + status_null_value: str + change_type_value: str + + def __init__( + self, + data: DataFrame, + source_column_name: str, + status_null_value: str = "Good", + change_type_value: str = "insert", + ) -> None: # NOSONAR + self.data = data + self.source_column_name = source_column_name + self.status_null_value = status_null_value + self.change_type_value = change_type_value + + @staticmethod + def system_type(): + """ + Attributes: + SystemType (Environment): Requires PYSPARK + """ + return SystemType.PYSPARK + + @staticmethod + def libraries(): + libraries = Libraries() + return libraries + + @staticmethod + def settings() -> dict: + return {} + + def pre_transform_validation(self): + return True + + def post_transform_validation(self): + return True + + def transform(self) -> DataFrame: + """ + Returns: + DataFrame: A dataframe with the specified column converted to PCDM + """ + df = ( + self.data.select( + from_json(col(self.source_column_name), "Payload STRING").alias("body") + ) + .select(from_json(expr("body.Payload"), AIO_SCHEMA).alias("body")) + .select(explode("body")) + .select(col("key").alias("TagName"), "value.*") + .select(col("SourceTimestamp").alias("EventTime"), "TagName", "Value") + .withColumn("Status", lit(self.status_null_value)) + .withColumn( + "ValueType", + when(col("Value").cast("float").isNotNull(), "float").otherwise( + "string" + ), + ) + .withColumn("ChangeType", lit(self.change_type_value)) + ) + + return df.select( + "EventTime", "TagName", "Status", "Value", "ValueType", "ChangeType" + ) diff --git a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_aio_opcua_json_to_pcdm.py b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_aio_opcua_json_to_pcdm.py new file mode 100644 index 000000000..2345742b6 --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_aio_opcua_json_to_pcdm.py @@ -0,0 +1,79 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +sys.path.insert(0, ".") +from src.sdk.python.rtdip_sdk.pipelines.transformers.spark.aio_json_to_pcdm import ( + AIOJsonToPCDMTransformer, +) +from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import ( + Libraries, + SystemType, +) + +from pyspark.sql import SparkSession, DataFrame +from pyspark.sql.types import StructType, StructField, StringType, TimestampType +from dateutil import parser + + +def test_aio_json_to_pcdm(spark_session: SparkSession): + aio_json_data = '{"SequenceNumber":12345,"Timestamp":"2024-05-13T13:05:10.975317Z","DataSetWriterName":"test","MessageType":"test","Payload":{"test_tag1":{"SourceTimestamp":"2024-05-13T13:05:19.7278555Z","Value":67},"test_tag2":{"SourceTimestamp":"2024-05-13T13:05:19.7288616Z","Value":165.5}}}' + aio_df: DataFrame = spark_session.createDataFrame([aio_json_data], "string").toDF( + "body" + ) + + expected_schema = StructType( + [ + StructField("EventTime", TimestampType(), True), + StructField("TagName", StringType(), False), + StructField("Status", StringType(), False), + StructField("Value", StringType(), True), + StructField("ValueType", StringType(), False), + StructField("ChangeType", StringType(), False), + ] + ) + + expected_data = [ + { + "TagName": "test_tag1", + "Value": "67", + "EventTime": parser.parse("2024-05-13T13:05:19.7278555Z"), + "Status": "Good", + "ValueType": "float", + "ChangeType": "insert", + }, + { + "TagName": "test_tag2", + "Value": "165.5", + "EventTime": parser.parse("2024-05-13T13:05:19.7288616Z"), + "Status": "Good", + "ValueType": "float", + "ChangeType": "insert", + }, + ] + + expected_df: DataFrame = spark_session.createDataFrame( + schema=expected_schema, data=expected_data + ) + + eventhub_json_to_aio_transformer = AIOJsonToPCDMTransformer( + data=aio_df, source_column_name="body" + ) + actual_df = eventhub_json_to_aio_transformer.transform() + + assert eventhub_json_to_aio_transformer.system_type() == SystemType.PYSPARK + assert isinstance(eventhub_json_to_aio_transformer.libraries(), Libraries) + assert expected_schema == actual_df.schema + assert expected_df.collect() == actual_df.collect()