Skip to content

Commit

Permalink
PCDM Delta Latest Values to Delta (rtdip#512)
Browse files Browse the repository at this point in the history
Signed-off-by: GBBBAS <[email protected]>
  • Loading branch information
GBBBAS authored Sep 29, 2023
1 parent 3548656 commit 329115f
Show file tree
Hide file tree
Showing 8 changed files with 531 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Write Process Control Data Model Latest Values to Delta
::: src.sdk.python.rtdip_sdk.pipelines.destinations.spark.pcdm_latest_to_delta
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# Write to Delta
# Write Process Control Data Model to Delta
::: src.sdk.python.rtdip_sdk.pipelines.destinations.spark.pcdm_to_delta
1 change: 1 addition & 0 deletions docs/sdk/pipelines/components.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ Destinations are components that connect to sink/destination systems and write d
|[Kinesis](../code-reference/pipelines/destinations/spark/kafka.md)|||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
|[Rest API](../code-reference/pipelines/destinations/spark/rest_api.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
|[Process Control Data Model To Delta](../code-reference/pipelines/destinations/spark/pcdm_to_delta.md)|||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
|[Process Control Data Model Latest Values To Delta](../code-reference/pipelines/destinations/spark/pcdm_latest_to_delta.md)|||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
|[EVM](../code-reference/pipelines/destinations/blockchain/evm.md)|:heavy_check_mark:|||:heavy_check_mark:|:heavy_check_mark:|

!!! note "Note"
Expand Down
3 changes: 2 additions & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ nav:
- Kafka: sdk/code-reference/pipelines/destinations/spark/kafka.md
- Kinesis: sdk/code-reference/pipelines/destinations/spark/kinesis.md
- Rest API: sdk/code-reference/pipelines/destinations/spark/rest_api.md
- Process Control Data Model To Delta: sdk/code-reference/pipelines/destinations/spark/pcdm_to_delta.md
- PCDM To Delta: sdk/code-reference/pipelines/destinations/spark/pcdm_to_delta.md
- PCDM Latest To Delta: sdk/code-reference/pipelines/destinations/spark/pcdm_latest_to_delta.md
- Python:
- Delta: sdk/code-reference/pipelines/destinations/python/delta.md
- Blockchain:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .spark.kinesis import *
from .spark.rest_api import *
from .spark.pcdm_to_delta import *
from .spark.pcdm_latest_to_delta import *
from .spark.kafka_eventhub import *
from .blockchain.evm import *
from .python.delta import *
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import time
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, when, struct, max
from pyspark.sql import Window
from py4j.protocol import Py4JJavaError

from ..interfaces import DestinationInterface
from .delta import SparkDeltaDestination
from .delta_merge import (
SparkDeltaMergeDestination,
DeltaMergeCondition,
DeltaMergeConditionValues,
)
from ..._pipeline_utils.models import Libraries, SystemType
from ..._pipeline_utils.constants import get_default_package


class ValueTypeConstants:
INTEGER_VALUE = "ValueType = 'integer'"
FLOAT_VALUE = "ValueType = 'float'"
STRING_VALUE = "ValueType = 'string'"


class SparkPCDMLatestToDeltaDestination(DestinationInterface):
"""
The Process Control Data Model Latest Values written to Delta
Args:
data (DataFrame): Dataframe to be merged into a Delta Table
options (dict): Options that can be specified for a Delta Table read operation (See Attributes table below). Further information on the options is available for [batch](https://docs.delta.io/latest/delta-batch.html#write-to-a-table){ target="_blank" } and [streaming](https://docs.delta.io/latest/delta-streaming.html#delta-table-as-a-sink){ target="_blank" }.
destination (str): Either the name of the Hive Metastore or Unity Catalog Delta Table **or** the path to the Delta table to store the latest values
mode (str): Method of writing to Delta Table - append/overwrite (batch), append/complete (stream)
trigger (optional str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds
query_name (str): Unique name for the query in associated SparkSession
query_wait_interval (optional int): If set, waits for the streaming query to complete before returning. (stream) Default is None
Attributes:
checkpointLocation (str): Path to checkpoint files. (Streaming)
"""

spark: SparkSession
data: DataFrame
options: dict
destination: str
mode: str
trigger: str
query_name: str
query_wait_interval: int

def __init__(
self,
spark: SparkSession,
data: DataFrame,
options: dict,
destination: str,
mode: str = None,
trigger="10 seconds",
query_name: str = "PCDMLatestToDeltaDestination",
query_wait_interval: int = None,
) -> None:
self.spark = spark
self.data = data
self.destination = destination
self.options = options
self.mode = mode
self.trigger = trigger
self.query_name = query_name
self.query_wait_interval = query_wait_interval

@staticmethod
def system_type():
"""
Attributes:
SystemType (Environment): Requires PYSPARK
"""
return SystemType.PYSPARK

@staticmethod
def libraries():
libraries = Libraries()
libraries.add_maven_library(get_default_package("spark_delta_core"))
return libraries

@staticmethod
def settings() -> dict:
return {}

def pre_write_validation(self):
return True

def post_write_validation(self):
return True

def _write_latest_to_delta(self, df: DataFrame, epoch_id=None): # NOSONAR
df.persist()

latest_df = (
df.withColumn(
"Latest",
max(struct("EventTime", "Status")).over(Window.partitionBy("TagName")),
)
.withColumn(
"GoodLatest",
when(
col("Latest.Status") == "Good",
struct(col("EventTime"), col("Value"), col("ValueType")),
).otherwise(
max(
when(
col("Status") == "Good",
struct("EventTime", "Value", "ValueType"),
)
).over(Window.partitionBy("TagName"))
),
)
.filter(col("EventTime") == col("Latest.EventTime"))
.drop("Latest")
.dropDuplicates(["TagName"])
)

when_matched_update_list = [
DeltaMergeConditionValues(
condition="source.EventTime > target.EventTime",
values={
"EventDate": "source.EventDate",
"EventTime": "source.EventTime",
"Status": "source.Status",
"Value": "source.Value",
"ValueType": "source.ValueType",
},
),
DeltaMergeConditionValues(
condition="source.GoodLatest.EventTime IS NOT NULL AND source.GoodLatest.EventTime > target.GoodEventTime",
values={
"GoodEventTime": "source.GoodLatest.EventTime",
"GoodValue": "source.GoodLatest.Value",
"GoodValueType": "source.GoodLatest.ValueType",
},
),
]

when_not_matched_insert_list = [
DeltaMergeConditionValues(
values={
"EventDate": "source.EventDate",
"TagName": "source.TagName",
"EventTime": "source.EventTime",
"Status": "source.Status",
"Value": "source.Value",
"ValueType": "source.ValueType",
"GoodEventTime": "source.GoodLatest.EventTime",
"GoodValue": "source.GoodLatest.Value",
"GoodValueType": "source.GoodLatest.ValueType",
},
)
]

merge_condition = "source.TagName = target.TagName"

SparkDeltaMergeDestination(
spark=self.spark,
data=latest_df,
destination=self.destination,
options=self.options,
merge_condition=merge_condition,
when_matched_update_list=when_matched_update_list,
when_not_matched_insert_list=when_not_matched_insert_list,
trigger=self.trigger,
query_name=self.query_name,
).write_batch()

df.unpersist()

def write_batch(self):
"""
Writes Process Control Data Model data to Delta
"""
try:
self._write_latest_to_delta(self.data)

except Py4JJavaError as e:
logging.exception(e.errmsg)
raise e
except Exception as e:
logging.exception(str(e))
raise e

def write_stream(self):
"""
Writes streaming Process Control Data Model data to Delta using foreachBatch
"""
try:
TRIGGER_OPTION = (
{"availableNow": True}
if self.trigger == "availableNow"
else {"processingTime": self.trigger}
)

query = (
self.data.writeStream.trigger(**TRIGGER_OPTION)
.format("delta")
.foreachBatch(self._write_latest_to_delta)
.queryName(self.query_name)
.outputMode("append")
.options(**self.options)
.start()
)

if self.query_wait_interval:
while query.isActive:
if query.lastProgress:
logging.info(query.lastProgress)
time.sleep(self.query_wait_interval)

except Py4JJavaError as e:
logging.exception(e.errmsg)
raise e
except Exception as e:
logging.exception(str(e))
raise e
Loading

0 comments on commit 329115f

Please sign in to comment.