diff --git a/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py index d3ff82bea..198b91431 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py +++ b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py @@ -640,3 +640,17 @@ def get_dbutils( ] ) ) + +MIRICO_METADATA_SCHEMA = StructType( + [ + StructField("retroName", StringType(), True), + StructField("siteName", StringType(), True), + StructField("retroAltitude", FloatType(), True), + StructField("sensorAltitude", FloatType(), True), + StructField("retroLongitude", FloatType(), True), + StructField("gasType", StringType(), True), + StructField("sensorLatitude", FloatType(), True), + StructField("retroLatitude", FloatType(), True), + StructField("sensorLongitude", FloatType(), True), + ] +) diff --git a/src/sdk/python/rtdip_sdk/pipelines/transformers/__init__.py b/src/sdk/python/rtdip_sdk/pipelines/transformers/__init__.py index 71fc6a973..f08ff19f8 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/transformers/__init__.py +++ b/src/sdk/python/rtdip_sdk/pipelines/transformers/__init__.py @@ -30,5 +30,6 @@ from .spark.honeywell_apm_to_pcdm import * from .spark.sem_json_to_pcdm import * from .spark.mirico_json_to_pcdm import * +from .spark.mirico_json_to_metadata import * from .spark.pandas_to_pyspark import * from .spark.pyspark_to_pandas import * diff --git a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/aio_json_to_pcdm.py b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/aio_json_to_pcdm.py index 1213605aa..bc0e2dce0 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/aio_json_to_pcdm.py +++ b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/aio_json_to_pcdm.py @@ -41,7 +41,7 @@ class AIOJsonToPCDMTransformer(TransformerInterface): Parameters: data (DataFrame): Dataframe containing the column with Json AIO data - source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data + source_column_name (str): Spark Dataframe column containing the Json AIO data status_null_value (str): If populated, will replace 'Good' in the Status column with the specified value. change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value. """ diff --git a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_metadata.py b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_metadata.py new file mode 100644 index 000000000..6d7ef0158 --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_metadata.py @@ -0,0 +1,119 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from pyspark.sql import DataFrame +import logging +from pyspark.sql.functions import ( + from_json, + col, + lit, + concat_ws, + upper, + expr, +) +from ...._sdk_utils.compare_versions import ( + _package_version_meets_minimum, +) +from ..interfaces import TransformerInterface +from ..._pipeline_utils.models import Libraries, SystemType +from ..._pipeline_utils.spark import MIRICO_METADATA_SCHEMA + + +class MiricoJsonToMetadataTransformer(TransformerInterface): + """ + Converts a Spark Dataframe column containing a json string created from Mirico to the Metadata Model. + + Example + -------- + ```python + from rtdip_sdk.pipelines.transformers import MiricoJsonToMetadataTransformer + + mirico_json_to_metadata_transformer = MiricoJsonToMetadataTransformer( + data=df + source_column_name="body" + ) + + result = mirico_json_to_metadata_transformer.transform() + ``` + + Parameters: + data (DataFrame): Dataframe containing the column with Mirico data + source_column_name (str): Spark Dataframe column containing the Json Mirico data + """ + + data: DataFrame + source_column_name: str + + def __init__(self, data: DataFrame, source_column_name: str) -> None: + _package_version_meets_minimum("pyspark", "3.4.0") + self.data = data + self.source_column_name = source_column_name + + @staticmethod + def system_type(): + """ + Attributes: + SystemType (Environment): Requires PYSPARK + """ + return SystemType.PYSPARK + + @staticmethod + def libraries(): + libraries = Libraries() + return libraries + + @staticmethod + def settings() -> dict: + return {} + + def pre_transform_validation(self): + return True + + def post_transform_validation(self): + return True + + def transform(self) -> DataFrame: + """ + Returns: + DataFrame: A dataframe with the specified column converted to Metadata model + """ + + df = self.data.select( + from_json(self.source_column_name, MIRICO_METADATA_SCHEMA).alias("body"), + ) + + tag_name_expr = concat_ws( + "_", + *[ + upper(col("body.siteName")), + upper(col("body.retroName")), + upper(col("body.gasType")), + ] + ) + + df = df.select( + tag_name_expr.alias("TagName"), + lit("").alias("Description"), + lit("").alias("UoM"), + expr( + """struct( + body.retroAltitude, + body.retroLongitude, + body.retroLatitude, + body.sensorAltitude, + body.sensorLongitude, + body.sensorLatitude)""" + ).alias("Properties"), + ).dropDuplicates(["TagName"]) + + return df.select("TagName", "Description", "UoM", "Properties") diff --git a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py index 4275a0d57..651451173 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py +++ b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py @@ -56,8 +56,8 @@ class MiricoJsonToPCDMTransformer(TransformerInterface): ``` Parameters: - data (DataFrame): Dataframe containing the column with SEM data - source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data + data (DataFrame): Dataframe containing the column with Mirico data + source_column_name (str): Spark Dataframe column containing the Json Mirico data status_null_value (optional str): If populated, will replace 'Good' in the Status column with the specified value. change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value. tagname_field (optional str): If populated, will add the specified field to the TagName column. diff --git a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opcua_json_to_pcdm.py b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opcua_json_to_pcdm.py index 24c31db72..df2297eda 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opcua_json_to_pcdm.py +++ b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opcua_json_to_pcdm.py @@ -22,7 +22,7 @@ class OPCUAJsonToPCDMTransformer(TransformerInterface): """ - Converts a Spark Dataframe column containing a json string created by Open Source OPCUA to the Process Control Data Model. + Converts a Spark Dataframe column containing a json string created by Open Source OPC UA to the Process Control Data Model. Example -------- @@ -40,7 +40,7 @@ class OPCUAJsonToPCDMTransformer(TransformerInterface): ``` Parameters: - data (DataFrame): Dataframe containing the column with Json OPCUA data + data (DataFrame): Dataframe containing the column with Json OPC UA data source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data status_null_value (str): If populated, will replace 'Good' in the Status column with the specified value. change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value. diff --git a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/sem_json_to_pcdm.py b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/sem_json_to_pcdm.py index d028cc7b2..ea7448ece 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/sem_json_to_pcdm.py +++ b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/sem_json_to_pcdm.py @@ -57,7 +57,7 @@ class SEMJsonToPCDMTransformer(TransformerInterface): Parameters: data (DataFrame): Dataframe containing the column with SEM data - source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data + source_column_name (str): Spark Dataframe column containing the Json SEM data version (int): The version for the OBC field mappings. The latest version is 10. status_null_value (optional str): If populated, will replace 'Good' in the Status column with the specified value. change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value. diff --git a/src/sdk/python/rtdip_sdk/queries/sql/sql_query.py b/src/sdk/python/rtdip_sdk/queries/sql/sql_query.py index a0891bb1b..1ad92d41e 100644 --- a/src/sdk/python/rtdip_sdk/queries/sql/sql_query.py +++ b/src/sdk/python/rtdip_sdk/queries/sql/sql_query.py @@ -49,6 +49,7 @@ def get( """ try: parameters_dict = {"sql_statement": sql_query} + parameters_dict["supress_warning"] = True if limit: parameters_dict["limit"] = limit if offset: diff --git a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_mirico_json_to_metadata.py b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_mirico_json_to_metadata.py new file mode 100644 index 000000000..df84a4ca0 --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_mirico_json_to_metadata.py @@ -0,0 +1,100 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +sys.path.insert(0, ".") +from src.sdk.python.rtdip_sdk.pipelines.transformers.spark.mirico_json_to_metadata import ( + MiricoJsonToMetadataTransformer, +) +from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import ( + Libraries, + SystemType, +) + +from pyspark.sql import SparkSession, DataFrame +from pyspark.sql.types import StructType, StructField, StringType, FloatType +import pytest +from src.sdk.python.rtdip_sdk._sdk_utils.compare_versions import ( + _package_version_meets_minimum, +) + + +def test_mirico_json_to_metadata(spark_session: SparkSession): + mirico_json_data = '{"gasType": "test", "retroLongitude": 123.45, "retroLatitude": 123.45 , "sensorAltitude": 123.45, "sensorLongitude": 123.45, "sensorLatitude": 123.45, "retroName": "test", "siteName": "test", "retroAltitude": 123.45}' + mirico_df: DataFrame = spark_session.createDataFrame([{"body": mirico_json_data}]) + + expected_schema = StructType( + [ + StructField("TagName", StringType(), False), + StructField("Description", StringType(), False), + StructField("UoM", StringType(), False), + StructField( + "Properties", + StructType( + [ + StructField("retroAltitude", FloatType(), True), + StructField("retroLongitude", FloatType(), True), + StructField("retroLatitude", FloatType(), True), + StructField("sensorAltitude", FloatType(), True), + StructField("sensorLongitude", FloatType(), True), + StructField("sensorLatitude", FloatType(), True), + ] + ), + False, + ), + ] + ) + + expected_data = [ + { + "TagName": "TEST_TEST_TEST", + "Description": "", + "UoM": "", + "Properties": { + "retroAltitude": 123.45, + "retroLongitude": 123.45, + "retroLatitude": 123.45, + "sensorAltitude": 123.45, + "sensorLongitude": 123.45, + "sensorLatitude": 123.45, + }, + } + ] + + expected_df: DataFrame = spark_session.createDataFrame( + schema=expected_schema, data=expected_data + ) + + try: + if _package_version_meets_minimum("pyspark", "3.4.0"): + mirico_json_to_metadata_transformer = MiricoJsonToMetadataTransformer( + data=mirico_df, source_column_name="body" + ) + actual_df = mirico_json_to_metadata_transformer.transform() + + assert ( + mirico_json_to_metadata_transformer.system_type() == SystemType.PYSPARK + ) + assert isinstance( + mirico_json_to_metadata_transformer.libraries(), Libraries + ) + + assert expected_schema == actual_df.schema + assert expected_df.collect() == actual_df.collect() + except: + with pytest.raises(Exception): + mirico_json_to_metadata_transformer = MiricoJsonToMetadataTransformer( + data=mirico_df, source_column_name="body" + )