diff --git a/docs/sdk/code-reference/pipelines/destinations/spark/pcdm_latest_to_delta.md b/docs/sdk/code-reference/pipelines/destinations/spark/pcdm_latest_to_delta.md new file mode 100644 index 000000000..17dab55cd --- /dev/null +++ b/docs/sdk/code-reference/pipelines/destinations/spark/pcdm_latest_to_delta.md @@ -0,0 +1,2 @@ +# Write Process Control Data Model Latest Values to Delta +::: src.sdk.python.rtdip_sdk.pipelines.destinations.spark.pcdm_latest_to_delta \ No newline at end of file diff --git a/docs/sdk/code-reference/pipelines/destinations/spark/pcdm_to_delta.md b/docs/sdk/code-reference/pipelines/destinations/spark/pcdm_to_delta.md index 26bd9e83d..0ec00cac0 100644 --- a/docs/sdk/code-reference/pipelines/destinations/spark/pcdm_to_delta.md +++ b/docs/sdk/code-reference/pipelines/destinations/spark/pcdm_to_delta.md @@ -1,2 +1,2 @@ -# Write to Delta +# Write Process Control Data Model to Delta ::: src.sdk.python.rtdip_sdk.pipelines.destinations.spark.pcdm_to_delta \ No newline at end of file diff --git a/docs/sdk/pipelines/components.md b/docs/sdk/pipelines/components.md index 916e1daaa..e0643429a 100644 --- a/docs/sdk/pipelines/components.md +++ b/docs/sdk/pipelines/components.md @@ -83,6 +83,7 @@ Destinations are components that connect to sink/destination systems and write d |[Kinesis](../code-reference/pipelines/destinations/spark/kafka.md)|||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:| |[Rest API](../code-reference/pipelines/destinations/spark/rest_api.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:| |[Process Control Data Model To Delta](../code-reference/pipelines/destinations/spark/pcdm_to_delta.md)|||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:| +|[Process Control Data Model Latest Values To Delta](../code-reference/pipelines/destinations/spark/pcdm_latest_to_delta.md)|||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:| |[EVM](../code-reference/pipelines/destinations/blockchain/evm.md)|:heavy_check_mark:|||:heavy_check_mark:|:heavy_check_mark:| !!! note "Note" diff --git a/mkdocs.yml b/mkdocs.yml index 41a21c93c..da71a083c 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -209,7 +209,8 @@ nav: - Kafka: sdk/code-reference/pipelines/destinations/spark/kafka.md - Kinesis: sdk/code-reference/pipelines/destinations/spark/kinesis.md - Rest API: sdk/code-reference/pipelines/destinations/spark/rest_api.md - - Process Control Data Model To Delta: sdk/code-reference/pipelines/destinations/spark/pcdm_to_delta.md + - PCDM To Delta: sdk/code-reference/pipelines/destinations/spark/pcdm_to_delta.md + - PCDM Latest To Delta: sdk/code-reference/pipelines/destinations/spark/pcdm_latest_to_delta.md - Python: - Delta: sdk/code-reference/pipelines/destinations/python/delta.md - Blockchain: diff --git a/src/sdk/python/rtdip_sdk/pipelines/destinations/__init__.py b/src/sdk/python/rtdip_sdk/pipelines/destinations/__init__.py index 759ff993a..618e86ba6 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/destinations/__init__.py +++ b/src/sdk/python/rtdip_sdk/pipelines/destinations/__init__.py @@ -19,6 +19,7 @@ from .spark.kinesis import * from .spark.rest_api import * from .spark.pcdm_to_delta import * +from .spark.pcdm_latest_to_delta import * from .spark.kafka_eventhub import * from .blockchain.evm import * from .python.delta import * diff --git a/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_latest_to_delta.py b/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_latest_to_delta.py new file mode 100644 index 000000000..b2ca64cfa --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_latest_to_delta.py @@ -0,0 +1,235 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from pyspark.sql import DataFrame, SparkSession +from pyspark.sql.functions import col, when, struct, max +from pyspark.sql import Window +from py4j.protocol import Py4JJavaError + +from ..interfaces import DestinationInterface +from .delta import SparkDeltaDestination +from .delta_merge import ( + SparkDeltaMergeDestination, + DeltaMergeCondition, + DeltaMergeConditionValues, +) +from ..._pipeline_utils.models import Libraries, SystemType +from ..._pipeline_utils.constants import get_default_package + + +class ValueTypeConstants: + INTEGER_VALUE = "ValueType = 'integer'" + FLOAT_VALUE = "ValueType = 'float'" + STRING_VALUE = "ValueType = 'string'" + + +class SparkPCDMLatestToDeltaDestination(DestinationInterface): + """ + The Process Control Data Model Latest Values written to Delta + + Args: + data (DataFrame): Dataframe to be merged into a Delta Table + options (dict): Options that can be specified for a Delta Table read operation (See Attributes table below). Further information on the options is available for [batch](https://docs.delta.io/latest/delta-batch.html#write-to-a-table){ target="_blank" } and [streaming](https://docs.delta.io/latest/delta-streaming.html#delta-table-as-a-sink){ target="_blank" }. + destination (str): Either the name of the Hive Metastore or Unity Catalog Delta Table **or** the path to the Delta table to store the latest values + mode (str): Method of writing to Delta Table - append/overwrite (batch), append/complete (stream) + trigger (optional str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds + query_name (str): Unique name for the query in associated SparkSession + query_wait_interval (optional int): If set, waits for the streaming query to complete before returning. (stream) Default is None + + Attributes: + checkpointLocation (str): Path to checkpoint files. (Streaming) + """ + + spark: SparkSession + data: DataFrame + options: dict + destination: str + mode: str + trigger: str + query_name: str + query_wait_interval: int + + def __init__( + self, + spark: SparkSession, + data: DataFrame, + options: dict, + destination: str, + mode: str = None, + trigger="10 seconds", + query_name: str = "PCDMLatestToDeltaDestination", + query_wait_interval: int = None, + ) -> None: + self.spark = spark + self.data = data + self.destination = destination + self.options = options + self.mode = mode + self.trigger = trigger + self.query_name = query_name + self.query_wait_interval = query_wait_interval + + @staticmethod + def system_type(): + """ + Attributes: + SystemType (Environment): Requires PYSPARK + """ + return SystemType.PYSPARK + + @staticmethod + def libraries(): + libraries = Libraries() + libraries.add_maven_library(get_default_package("spark_delta_core")) + return libraries + + @staticmethod + def settings() -> dict: + return {} + + def pre_write_validation(self): + return True + + def post_write_validation(self): + return True + + def _write_latest_to_delta(self, df: DataFrame, epoch_id=None): # NOSONAR + df.persist() + + latest_df = ( + df.withColumn( + "Latest", + max(struct("EventTime", "Status")).over(Window.partitionBy("TagName")), + ) + .withColumn( + "GoodLatest", + when( + col("Latest.Status") == "Good", + struct(col("EventTime"), col("Value"), col("ValueType")), + ).otherwise( + max( + when( + col("Status") == "Good", + struct("EventTime", "Value", "ValueType"), + ) + ).over(Window.partitionBy("TagName")) + ), + ) + .filter(col("EventTime") == col("Latest.EventTime")) + .drop("Latest") + .dropDuplicates(["TagName"]) + ) + + when_matched_update_list = [ + DeltaMergeConditionValues( + condition="source.EventTime > target.EventTime", + values={ + "EventDate": "source.EventDate", + "EventTime": "source.EventTime", + "Status": "source.Status", + "Value": "source.Value", + "ValueType": "source.ValueType", + }, + ), + DeltaMergeConditionValues( + condition="source.GoodLatest.EventTime IS NOT NULL AND source.GoodLatest.EventTime > target.GoodEventTime", + values={ + "GoodEventTime": "source.GoodLatest.EventTime", + "GoodValue": "source.GoodLatest.Value", + "GoodValueType": "source.GoodLatest.ValueType", + }, + ), + ] + + when_not_matched_insert_list = [ + DeltaMergeConditionValues( + values={ + "EventDate": "source.EventDate", + "TagName": "source.TagName", + "EventTime": "source.EventTime", + "Status": "source.Status", + "Value": "source.Value", + "ValueType": "source.ValueType", + "GoodEventTime": "source.GoodLatest.EventTime", + "GoodValue": "source.GoodLatest.Value", + "GoodValueType": "source.GoodLatest.ValueType", + }, + ) + ] + + merge_condition = "source.TagName = target.TagName" + + SparkDeltaMergeDestination( + spark=self.spark, + data=latest_df, + destination=self.destination, + options=self.options, + merge_condition=merge_condition, + when_matched_update_list=when_matched_update_list, + when_not_matched_insert_list=when_not_matched_insert_list, + trigger=self.trigger, + query_name=self.query_name, + ).write_batch() + + df.unpersist() + + def write_batch(self): + """ + Writes Process Control Data Model data to Delta + """ + try: + self._write_latest_to_delta(self.data) + + except Py4JJavaError as e: + logging.exception(e.errmsg) + raise e + except Exception as e: + logging.exception(str(e)) + raise e + + def write_stream(self): + """ + Writes streaming Process Control Data Model data to Delta using foreachBatch + """ + try: + TRIGGER_OPTION = ( + {"availableNow": True} + if self.trigger == "availableNow" + else {"processingTime": self.trigger} + ) + + query = ( + self.data.writeStream.trigger(**TRIGGER_OPTION) + .format("delta") + .foreachBatch(self._write_latest_to_delta) + .queryName(self.query_name) + .outputMode("append") + .options(**self.options) + .start() + ) + + if self.query_wait_interval: + while query.isActive: + if query.lastProgress: + logging.info(query.lastProgress) + time.sleep(self.query_wait_interval) + + except Py4JJavaError as e: + logging.exception(e.errmsg) + raise e + except Exception as e: + logging.exception(str(e)) + raise e diff --git a/tests/sdk/python/rtdip_sdk/pipelines/destinations/spark/test_pcdm_latest_to_delta.py b/tests/sdk/python/rtdip_sdk/pipelines/destinations/spark/test_pcdm_latest_to_delta.py new file mode 100644 index 000000000..c906df9a7 --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/destinations/spark/test_pcdm_latest_to_delta.py @@ -0,0 +1,289 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +sys.path.insert(0, ".") +import pytest +from src.sdk.python.rtdip_sdk._sdk_utils.compare_versions import _get_package_version +from src.sdk.python.rtdip_sdk.pipelines.destinations import ( + SparkPCDMLatestToDeltaDestination, +) +from src.sdk.python.rtdip_sdk.pipelines.utilities.spark.delta_table_create import ( + DeltaTableCreateUtility, + DeltaTableColumn, +) +from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import ( + Libraries, + MavenLibrary, +) +from pyspark.sql import SparkSession +from pytest_mock import MockerFixture +from datetime import datetime + + +class MockDeltaDestination: + def write_stream(self): + return None + + +class TestStreamingQueryClass: + isActive: bool = False # NOSONAR + + +def create_delta_table(spark_session, name): + table_create_utility = DeltaTableCreateUtility( + spark=spark_session, + table_name=name, + columns=[ + DeltaTableColumn(name="EventDate", type="date", nullable=True), + DeltaTableColumn(name="TagName", type="string", nullable=True), + DeltaTableColumn(name="EventTime", type="timestamp", nullable=True), + DeltaTableColumn(name="Status", type="string", nullable=True), + DeltaTableColumn(name="Value", type="string", nullable=True), + DeltaTableColumn(name="ValueType", type="string", nullable=True), + DeltaTableColumn(name="GoodEventTime", type="timestamp", nullable=True), + DeltaTableColumn(name="GoodValue", type="string", nullable=True), + DeltaTableColumn(name="GoodValueType", type="string", nullable=True), + ], + properties={ + "delta.logRetentionDuration": "7 days", + "delta.enableChangeDataFeed": "true", + }, + ) + + table_create_utility.execute() + + +def test_spark_pcdm_latest_to_delta_write_setup(spark_session: SparkSession): + pcdm_latest_o_delta_destination = SparkPCDMLatestToDeltaDestination( + spark_session, + None, + {}, + "test_delta_latest_destination_setup", + ) + assert pcdm_latest_o_delta_destination.system_type().value == 2 + assert pcdm_latest_o_delta_destination.libraries() == Libraries( + maven_libraries=[ + MavenLibrary( + group_id="io.delta", + artifact_id="delta-core_2.12", + version=_get_package_version("delta-spark"), + ) + ], + pypi_libraries=[], + pythonwheel_libraries=[], + ) + assert isinstance(pcdm_latest_o_delta_destination.settings(), dict) + assert pcdm_latest_o_delta_destination.pre_write_validation() + assert pcdm_latest_o_delta_destination.post_write_validation() + + +def test_spark_pcdm_latest_to_delta_write_batch(spark_session: SparkSession): + create_delta_table(spark_session, "test_spark_pcdm_latest_to_delta_write_batch") + + test_df = spark_session.createDataFrame( + [ + { + "EventDate": datetime(2023, 1, 20).date(), + "TagName": "Tag1", + "EventTime": datetime(2023, 1, 20, 1, 0), + "Status": "Good", + "Value": "1.01", + "ValueType": "float", + "ChangeType": "insert", + }, + { + "EventDate": datetime(2023, 1, 20).date(), + "TagName": "Tag1", + "EventTime": datetime(2023, 1, 20, 1, 1), + "Status": "Good", + "Value": "1.02", + "ValueType": "float", + "ChangeType": "insert", + }, + ] + ) + + expected_df = spark_session.createDataFrame( + [ + { + "EventDate": datetime(2023, 1, 20).date(), + "TagName": "Tag1", + "EventTime": datetime(2023, 1, 20, 1, 1), + "Status": "Good", + "Value": "1.02", + "ValueType": "float", + "GoodEventTime": datetime(2023, 1, 20, 1, 1), + "GoodValue": "1.02", + "GoodValueType": "float", + } + ] + ).select( + "EventDate", + "TagName", + "EventTime", + "Status", + "Value", + "ValueType", + "GoodEventTime", + "GoodValue", + "GoodValueType", + ) + + pcdm_latest_to_delta_destination = SparkPCDMLatestToDeltaDestination( + spark_session, + test_df, + {}, + "test_spark_pcdm_latest_to_delta_write_batch", + ) + pcdm_latest_to_delta_destination.write_batch() + + actual_df = spark_session.table("test_spark_pcdm_latest_to_delta_write_batch") + assert expected_df.schema == actual_df.schema + assert expected_df.collect() == actual_df.collect() + + +def test_spark_pcdm_to_delta_write_stream_merge( + spark_session: SparkSession, mocker: MockerFixture +): + mocker.patch( + "pyspark.sql.DataFrame.writeStream", + new_callable=mocker.Mock( + return_value=mocker.Mock( + trigger=mocker.Mock( + return_value=mocker.Mock( + format=mocker.Mock( + return_value=mocker.Mock( + foreachBatch=mocker.Mock( + return_value=mocker.Mock( + queryName=mocker.Mock( + return_value=mocker.Mock( + outputMode=mocker.Mock( + return_value=mocker.Mock( + options=mocker.Mock( + return_value=mocker.Mock( + start=mocker.Mock( + return_value=TestStreamingQueryClass() + ) + ) + ) + ) + ) + ) + ) + ) + ) + ) + ) + ) + ) + ) + ), + ) + expected_df = spark_session.createDataFrame( + [ + { + "EventDate": datetime(2023, 1, 20).date(), + "TagName": "Tag1", + "EventTime": datetime(2023, 1, 20, 1, 0), + "Status": "Good", + "Value": "1.02", + "ValueType": "float", + "ChangeType": "insert", + }, + ] + ) + pcdm_latest_to_delta_destination = SparkPCDMLatestToDeltaDestination( + spark_session, + expected_df, + {}, + "test_spark_pcdm_to_delta_write_stream", + ) + actual = pcdm_latest_to_delta_destination.write_stream() + assert actual is None + + +def test_spark_pcdm_latest_to_delta_write_batch_fails( + spark_session: SparkSession, mocker: MockerFixture +): + mocker.patch( + "src.sdk.python.rtdip_sdk.pipelines.destinations.spark.pcdm_latest_to_delta", + new_callable=mocker.Mock( + return_value=mocker.Mock( + SparkDeltaMergeDestination=mocker.Mock( + return_value=mocker.Mock( + _write_data_by_type=mocker.Mock(side_effect=Exception) + ) + ) + ) + ), + ) + expected_df = spark_session.createDataFrame([{"id": "1"}]) + pcdm_to_delta_destination = SparkPCDMLatestToDeltaDestination( + spark_session, + expected_df, + {}, + "test_spark_pcdm_latest_to_delta_write_batch", + ) + with pytest.raises(Exception): + pcdm_to_delta_destination.write_batch() + + +def test_spark_pcdm_to_delta_write_stream_fails( + spark_session: SparkSession, mocker: MockerFixture +): + mocker.patch( + "pyspark.sql.DataFrame.writeStream", + new_callable=mocker.Mock( + return_value=mocker.Mock( + trigger=mocker.Mock( + return_value=mocker.Mock( + format=mocker.Mock( + return_value=mocker.Mock( + foreachBatch=mocker.Mock( + return_value=mocker.Mock( + queryName=mocker.Mock( + return_value=mocker.Mock( + outputMode=mocker.Mock( + return_value=mocker.Mock( + options=mocker.Mock( + return_value=mocker.Mock( + start=mocker.Mock( + side_effect=Exception + ) + ) + ) + ) + ) + ) + ) + ) + ) + ) + ) + ) + ) + ) + ), + ) + expected_df = spark_session.createDataFrame([{"id": "1"}]) + pcdm_to_delta_destination = SparkPCDMLatestToDeltaDestination( + spark_session, + expected_df, + {}, + "test_spark_pcdm_latest_to_delta_write_stream", + ) + with pytest.raises(Exception): + pcdm_to_delta_destination.write_stream() diff --git a/tests/sdk/python/rtdip_sdk/pipelines/destinations/spark/test_pcdm_to_delta.py b/tests/sdk/python/rtdip_sdk/pipelines/destinations/spark/test_pcdm_to_delta.py index 96430647a..6427057c0 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/destinations/spark/test_pcdm_to_delta.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/destinations/spark/test_pcdm_to_delta.py @@ -17,11 +17,6 @@ sys.path.insert(0, ".") import pytest from src.sdk.python.rtdip_sdk._sdk_utils.compare_versions import _get_package_version -from src.sdk.python.rtdip_sdk.pipelines.destinations.spark.delta_merge import ( - SparkDeltaMergeDestination, - DeltaMergeCondition, - DeltaMergeConditionValues, -) from src.sdk.python.rtdip_sdk.pipelines.destinations.spark.delta import ( SparkDeltaDestination, )