diff --git a/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py index 1107deff4..d3ff82bea 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py +++ b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py @@ -620,3 +620,23 @@ def get_dbutils( ] ), ) + +OPCUA_SCHEMA = ArrayType( + StructType( + [ + StructField("DataSetWriterId", LongType(), True), + StructField("Timestamp", TimestampType(), True), + StructField( + "Payload", + MapType( + StringType(), + StructType( + [ + StructField("Value", StringType(), True), + ] + ), + ), + ), + ] + ) +) diff --git a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/aio_json_to_pcdm.py b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/aio_json_to_pcdm.py index 3ad52c3a7..1213605aa 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/aio_json_to_pcdm.py +++ b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/aio_json_to_pcdm.py @@ -22,14 +22,14 @@ class AIOJsonToPCDMTransformer(TransformerInterface): """ - Converts a Spark Dataframe column containing a json string created by Fledge to the Process Control Data Model. + Converts a Spark Dataframe column containing a json string created by AIO to the Process Control Data Model. Example -------- ```python from rtdip_sdk.pipelines.transformers import AIOJsonToPCDMTransformer - fledge_opcua_json_to_pcdm_transfromer = AIOJsonToPCDMTransformer( + aio_json_to_pcdm_transfromer = AIOJsonToPCDMTransformer( data=df, souce_column_name="body", status_null_value="Good", @@ -40,7 +40,7 @@ class AIOJsonToPCDMTransformer(TransformerInterface): ``` Parameters: - data (DataFrame): Dataframe containing the column with Json Fledge data + data (DataFrame): Dataframe containing the column with Json AIO data source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data status_null_value (str): If populated, will replace 'Good' in the Status column with the specified value. change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value. diff --git a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opcua_json_to_pcdm.py b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opcua_json_to_pcdm.py new file mode 100644 index 000000000..24c31db72 --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opcua_json_to_pcdm.py @@ -0,0 +1,114 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pyspark.sql import DataFrame +from pyspark.sql.functions import from_json, col, explode, when, lit, expr + +from ..interfaces import TransformerInterface +from ..._pipeline_utils.models import Libraries, SystemType +from ..._pipeline_utils.spark import OPCUA_SCHEMA + + +class OPCUAJsonToPCDMTransformer(TransformerInterface): + """ + Converts a Spark Dataframe column containing a json string created by Open Source OPCUA to the Process Control Data Model. + + Example + -------- + ```python + from rtdip_sdk.pipelines.transformers import OPCUAJsonToPCDMTransformer + + opcua_json_to_pcdm_transfromer = OPCUAJsonToPCDMTransformer( + data=df, + souce_column_name="body", + status_null_value="Good", + change_type_value="insert" + ) + + result = opcua_json_to_pcdm_transfromer.transform() + ``` + + Parameters: + data (DataFrame): Dataframe containing the column with Json OPCUA data + source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data + status_null_value (str): If populated, will replace 'Good' in the Status column with the specified value. + change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value. + """ + + data: DataFrame + source_column_name: str + status_null_value: str + change_type_value: str + + def __init__( + self, + data: DataFrame, + source_column_name: str, + status_null_value: str = "Good", + change_type_value: str = "insert", + ) -> None: # NOSONAR + self.data = data + self.source_column_name = source_column_name + self.status_null_value = status_null_value + self.change_type_value = change_type_value + + @staticmethod + def system_type(): + """ + Attributes: + SystemType (Environment): Requires PYSPARK + """ + return SystemType.PYSPARK + + @staticmethod + def libraries(): + libraries = Libraries() + return libraries + + @staticmethod + def settings() -> dict: + return {} + + def pre_transform_validation(self): + return True + + def post_transform_validation(self): + return True + + def transform(self) -> DataFrame: + """ + Returns: + DataFrame: A dataframe with the specified column converted to PCDM + """ + df = ( + self.data.select( + from_json(col(self.source_column_name), "Messages STRING").alias("body") + ) + .select(from_json(expr("body.Messages"), OPCUA_SCHEMA).alias("body")) + .selectExpr("inline(body)") + .select(col("Timestamp").alias("EventTime"), explode("Payload")) + .select("EventTime", col("key").alias("TagName"), "value.*") + .withColumn("Status", lit(self.status_null_value)) + .withColumn( + "ValueType", + when(col("Value").cast("float").isNotNull(), "float").otherwise( + "string" + ), + ) + .withColumn("ChangeType", lit(self.change_type_value)) + ) + + return df.select( + "EventTime", "TagName", "Status", "Value", "ValueType", "ChangeType" + ) diff --git a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_aio_opcua_json_to_pcdm.py b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_aio_json_to_pcdm.py similarity index 90% rename from tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_aio_opcua_json_to_pcdm.py rename to tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_aio_json_to_pcdm.py index 2345742b6..921720c01 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_aio_opcua_json_to_pcdm.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_aio_json_to_pcdm.py @@ -68,12 +68,12 @@ def test_aio_json_to_pcdm(spark_session: SparkSession): schema=expected_schema, data=expected_data ) - eventhub_json_to_aio_transformer = AIOJsonToPCDMTransformer( + aio_json_to_pcdm_transformer = AIOJsonToPCDMTransformer( data=aio_df, source_column_name="body" ) - actual_df = eventhub_json_to_aio_transformer.transform() + actual_df = aio_json_to_pcdm_transformer.transform() - assert eventhub_json_to_aio_transformer.system_type() == SystemType.PYSPARK - assert isinstance(eventhub_json_to_aio_transformer.libraries(), Libraries) + assert aio_json_to_pcdm_transformer.system_type() == SystemType.PYSPARK + assert isinstance(aio_json_to_pcdm_transformer.libraries(), Libraries) assert expected_schema == actual_df.schema assert expected_df.collect() == actual_df.collect() diff --git a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_opcua_json_to_pcdm.py b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_opcua_json_to_pcdm.py new file mode 100644 index 000000000..d1dd885ed --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_opcua_json_to_pcdm.py @@ -0,0 +1,79 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +sys.path.insert(0, ".") +from src.sdk.python.rtdip_sdk.pipelines.transformers.spark.opcua_json_to_pcdm import ( + OPCUAJsonToPCDMTransformer, +) +from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import ( + Libraries, + SystemType, +) + +from pyspark.sql import SparkSession, DataFrame +from pyspark.sql.types import StructType, StructField, StringType, TimestampType +from dateutil import parser + + +def test_aio_json_to_pcdm(spark_session: SparkSession): + opcua_json_data = '{"MessageId":"12345","MessageType":"test","PublisherId":"opcua_pub","Messages":[{"DataSetWriterId":12345,"Timestamp":"2024-05-07T09:54:31.6769914Z","Payload":{"tag_1":{"Value":100.2}}},{"DataSetWriterId":56789,"Timestamp":"2024-05-07T09:54:31.6509972Z","Payload":{"tag_2":{"Value":79}}}]}' + opcua_df: DataFrame = spark_session.createDataFrame( + [opcua_json_data], "string" + ).toDF("body") + + expected_schema = StructType( + [ + StructField("EventTime", TimestampType(), True), + StructField("TagName", StringType(), False), + StructField("Status", StringType(), False), + StructField("Value", StringType(), True), + StructField("ValueType", StringType(), False), + StructField("ChangeType", StringType(), False), + ] + ) + + expected_data = [ + { + "TagName": "tag_1", + "Value": "100.2", + "EventTime": parser.parse("2024-05-07T09:54:31.6769914Z"), + "Status": "Good", + "ValueType": "float", + "ChangeType": "insert", + }, + { + "TagName": "tag_2", + "Value": "79", + "EventTime": parser.parse("2024-05-07T09:54:31.6509972Z"), + "Status": "Good", + "ValueType": "float", + "ChangeType": "insert", + }, + ] + + expected_df: DataFrame = spark_session.createDataFrame( + schema=expected_schema, data=expected_data + ) + + opcua_json_to_pcdm_transformer = OPCUAJsonToPCDMTransformer( + data=opcua_df, source_column_name="body" + ) + actual_df = opcua_json_to_pcdm_transformer.transform() + + assert opcua_json_to_pcdm_transformer.system_type() == SystemType.PYSPARK + assert isinstance(opcua_json_to_pcdm_transformer.libraries(), Libraries) + assert expected_schema == actual_df.schema + assert expected_df.collect() == actual_df.collect()