From 65bfce27da300b6057ef309fce25b19176313893 Mon Sep 17 00:00:00 2001 From: GBBBAS <42962356+GBBBAS@users.noreply.github.com> Date: Wed, 19 Jun 2024 20:11:42 +0100 Subject: [PATCH] Update to PCDM logic to cater for Null EventTimes (#764) * Update to PCDM logic Signed-off-by: GBBBAS * Update to condition logic Signed-off-by: GBBBAS --------- Signed-off-by: GBBBAS --- .../pipelines/destinations/spark/pcdm_latest_to_delta.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_latest_to_delta.py b/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_latest_to_delta.py index a271aa0e9..a832ca3d5 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_latest_to_delta.py +++ b/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_latest_to_delta.py @@ -185,7 +185,7 @@ def _write_latest_to_delta(self, df: DataFrame, epoch_id=None): # NOSONAR }, ), DeltaMergeConditionValues( - condition="source.EventTime > target.EventTime AND source.GoodLatest.EventTime IS NOT NULL AND source.GoodLatest.EventTime > target.GoodEventTime", + condition="source.EventTime > target.EventTime AND (source.GoodLatest.EventTime IS NOT NULL AND (source.GoodLatest.EventTime > target.GoodEventTime OR target.GoodEventTime IS NULL))", values={ "EventTime": "source.EventTime", "Status": "source.Status", @@ -197,7 +197,7 @@ def _write_latest_to_delta(self, df: DataFrame, epoch_id=None): # NOSONAR }, ), DeltaMergeConditionValues( - condition="source.EventTime <= target.EventTime AND source.GoodLatest.EventTime IS NOT NULL AND source.GoodLatest.EventTime > target.GoodEventTime", + condition="source.EventTime <= target.EventTime AND (source.GoodLatest.EventTime IS NOT NULL AND (source.GoodLatest.EventTime > target.GoodEventTime OR target.GoodEventTime IS NULL))", values={ "GoodEventTime": "source.GoodLatest.EventTime", "GoodValue": "source.GoodLatest.Value",