Skip to content

Commit

Permalink
opcua transformer and unit tests (rtdip#751)
Browse files Browse the repository at this point in the history
Signed-off-by: Chloe Ching <[email protected]>
  • Loading branch information
cching95 authored May 22, 2024
1 parent f2396bd commit e0ffdc7
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 7 deletions.
20 changes: 20 additions & 0 deletions src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,3 +620,23 @@ def get_dbutils(
]
),
)

OPCUA_SCHEMA = ArrayType(
StructType(
[
StructField("DataSetWriterId", LongType(), True),
StructField("Timestamp", TimestampType(), True),
StructField(
"Payload",
MapType(
StringType(),
StructType(
[
StructField("Value", StringType(), True),
]
),
),
),
]
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@

class AIOJsonToPCDMTransformer(TransformerInterface):
"""
Converts a Spark Dataframe column containing a json string created by Fledge to the Process Control Data Model.
Converts a Spark Dataframe column containing a json string created by AIO to the Process Control Data Model.
Example
--------
```python
from rtdip_sdk.pipelines.transformers import AIOJsonToPCDMTransformer
fledge_opcua_json_to_pcdm_transfromer = AIOJsonToPCDMTransformer(
aio_json_to_pcdm_transfromer = AIOJsonToPCDMTransformer(
data=df,
souce_column_name="body",
status_null_value="Good",
Expand All @@ -40,7 +40,7 @@ class AIOJsonToPCDMTransformer(TransformerInterface):
```
Parameters:
data (DataFrame): Dataframe containing the column with Json Fledge data
data (DataFrame): Dataframe containing the column with Json AIO data
source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data
status_null_value (str): If populated, will replace 'Good' in the Status column with the specified value.
change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pyspark.sql import DataFrame
from pyspark.sql.functions import from_json, col, explode, when, lit, expr

from ..interfaces import TransformerInterface
from ..._pipeline_utils.models import Libraries, SystemType
from ..._pipeline_utils.spark import OPCUA_SCHEMA


class OPCUAJsonToPCDMTransformer(TransformerInterface):
"""
Converts a Spark Dataframe column containing a json string created by Open Source OPCUA to the Process Control Data Model.
Example
--------
```python
from rtdip_sdk.pipelines.transformers import OPCUAJsonToPCDMTransformer
opcua_json_to_pcdm_transfromer = OPCUAJsonToPCDMTransformer(
data=df,
souce_column_name="body",
status_null_value="Good",
change_type_value="insert"
)
result = opcua_json_to_pcdm_transfromer.transform()
```
Parameters:
data (DataFrame): Dataframe containing the column with Json OPCUA data
source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data
status_null_value (str): If populated, will replace 'Good' in the Status column with the specified value.
change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value.
"""

data: DataFrame
source_column_name: str
status_null_value: str
change_type_value: str

def __init__(
self,
data: DataFrame,
source_column_name: str,
status_null_value: str = "Good",
change_type_value: str = "insert",
) -> None: # NOSONAR
self.data = data
self.source_column_name = source_column_name
self.status_null_value = status_null_value
self.change_type_value = change_type_value

@staticmethod
def system_type():
"""
Attributes:
SystemType (Environment): Requires PYSPARK
"""
return SystemType.PYSPARK

@staticmethod
def libraries():
libraries = Libraries()
return libraries

@staticmethod
def settings() -> dict:
return {}

def pre_transform_validation(self):
return True

def post_transform_validation(self):
return True

def transform(self) -> DataFrame:
"""
Returns:
DataFrame: A dataframe with the specified column converted to PCDM
"""
df = (
self.data.select(
from_json(col(self.source_column_name), "Messages STRING").alias("body")
)
.select(from_json(expr("body.Messages"), OPCUA_SCHEMA).alias("body"))
.selectExpr("inline(body)")
.select(col("Timestamp").alias("EventTime"), explode("Payload"))
.select("EventTime", col("key").alias("TagName"), "value.*")
.withColumn("Status", lit(self.status_null_value))
.withColumn(
"ValueType",
when(col("Value").cast("float").isNotNull(), "float").otherwise(
"string"
),
)
.withColumn("ChangeType", lit(self.change_type_value))
)

return df.select(
"EventTime", "TagName", "Status", "Value", "ValueType", "ChangeType"
)
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ def test_aio_json_to_pcdm(spark_session: SparkSession):
schema=expected_schema, data=expected_data
)

eventhub_json_to_aio_transformer = AIOJsonToPCDMTransformer(
aio_json_to_pcdm_transformer = AIOJsonToPCDMTransformer(
data=aio_df, source_column_name="body"
)
actual_df = eventhub_json_to_aio_transformer.transform()
actual_df = aio_json_to_pcdm_transformer.transform()

assert eventhub_json_to_aio_transformer.system_type() == SystemType.PYSPARK
assert isinstance(eventhub_json_to_aio_transformer.libraries(), Libraries)
assert aio_json_to_pcdm_transformer.system_type() == SystemType.PYSPARK
assert isinstance(aio_json_to_pcdm_transformer.libraries(), Libraries)
assert expected_schema == actual_df.schema
assert expected_df.collect() == actual_df.collect()
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys

sys.path.insert(0, ".")
from src.sdk.python.rtdip_sdk.pipelines.transformers.spark.opcua_json_to_pcdm import (
OPCUAJsonToPCDMTransformer,
)
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import (
Libraries,
SystemType,
)

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from dateutil import parser


def test_aio_json_to_pcdm(spark_session: SparkSession):
opcua_json_data = '{"MessageId":"12345","MessageType":"test","PublisherId":"opcua_pub","Messages":[{"DataSetWriterId":12345,"Timestamp":"2024-05-07T09:54:31.6769914Z","Payload":{"tag_1":{"Value":100.2}}},{"DataSetWriterId":56789,"Timestamp":"2024-05-07T09:54:31.6509972Z","Payload":{"tag_2":{"Value":79}}}]}'
opcua_df: DataFrame = spark_session.createDataFrame(
[opcua_json_data], "string"
).toDF("body")

expected_schema = StructType(
[
StructField("EventTime", TimestampType(), True),
StructField("TagName", StringType(), False),
StructField("Status", StringType(), False),
StructField("Value", StringType(), True),
StructField("ValueType", StringType(), False),
StructField("ChangeType", StringType(), False),
]
)

expected_data = [
{
"TagName": "tag_1",
"Value": "100.2",
"EventTime": parser.parse("2024-05-07T09:54:31.6769914Z"),
"Status": "Good",
"ValueType": "float",
"ChangeType": "insert",
},
{
"TagName": "tag_2",
"Value": "79",
"EventTime": parser.parse("2024-05-07T09:54:31.6509972Z"),
"Status": "Good",
"ValueType": "float",
"ChangeType": "insert",
},
]

expected_df: DataFrame = spark_session.createDataFrame(
schema=expected_schema, data=expected_data
)

opcua_json_to_pcdm_transformer = OPCUAJsonToPCDMTransformer(
data=opcua_df, source_column_name="body"
)
actual_df = opcua_json_to_pcdm_transformer.transform()

assert opcua_json_to_pcdm_transformer.system_type() == SystemType.PYSPARK
assert isinstance(opcua_json_to_pcdm_transformer.libraries(), Libraries)
assert expected_schema == actual_df.schema
assert expected_df.collect() == actual_df.collect()

0 comments on commit e0ffdc7

Please sign in to comment.