diff --git a/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py index bec56a16d..4bc46e50e 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py +++ b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py @@ -252,27 +252,37 @@ def get_dbutils( APM_SCHEMA = StructType( [ - StructField("Id", StringType(), True), - StructField("TenantId", StringType(), True), - StructField("IdType", StringType(), True), StructField( - "Samples", - ArrayType( - StructType( - [ - StructField("ItemName", StringType(), True), - StructField("Time", StringType(), True), - StructField("Value", StringType(), True), - StructField("Unit", StringType(), True), - StructField("NormalizedQuality", StringType(), True), - StructField("HighValue", DoubleType(), True), - StructField("LowValue", DoubleType(), True), - StructField("TargetValue", DoubleType(), True), - ] - ) + "SystemTimeSeries", + StructType( + [ + StructField("Id", StringType(), True), + StructField("TenantId", StringType(), True), + StructField("IdType", StringType(), True), + StructField( + "Samples", + ArrayType( + StructType( + [ + StructField("ItemName", StringType(), True), + StructField("Time", StringType(), True), + StructField("Value", StringType(), True), + StructField("Unit", StringType(), True), + StructField( + "NormalizedQuality", StringType(), True + ), + StructField("HighValue", DoubleType(), True), + StructField("LowValue", DoubleType(), True), + StructField("TargetValue", DoubleType(), True), + ] + ) + ), + True, + ), + ] ), True, - ), + ) ] ) diff --git a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/honeywell_apm_to_pcdm.py b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/honeywell_apm_to_pcdm.py index c6ebd9844..988390e0e 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/honeywell_apm_to_pcdm.py +++ b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/honeywell_apm_to_pcdm.py @@ -78,7 +78,7 @@ def transform(self) -> DataFrame: """ df = ( self.data.withColumn("body", from_json(self.source_column_name, APM_SCHEMA)) - .select(explode("body.Samples")) + .select(explode("body.SystemTimeSeries.Samples")) .selectExpr("*", "to_timestamp(col.Time) as EventTime") .withColumn("TagName", col("col.Itemname")) .withColumn("Status", lit(self.status_null_value)) diff --git a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_honeywell_apm_to_pcdm.py b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_honeywell_apm_to_pcdm.py index e66b6274c..01bcacfb3 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_honeywell_apm_to_pcdm.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_honeywell_apm_to_pcdm.py @@ -29,7 +29,7 @@ def test_honeywell_apm_to_pcdm(spark_session: SparkSession): - honeywell_json_data = '{"Id": "testId","TenantId": "testTenantId","IdType": "calculatedpoint","Samples": [{"ItemName": "test.item1", "Time": "2023-07-31T06:53:00+00:00","Value": "5.0","Unit": null,"NormalizedQuality": "good", "HighValue": null,"LowValue": null,"TargetValue": null},{"ItemName": "test_item2","Time": "2023-07-31T06:53:00+00:00","Value": 0.0,"Unit": null,"NormalizedQuality": "good","HighValue": null,"LowValue": null,"TargetValue": null},{"ItemName": "testItem3","Time": "2023-07-31T06:53:00.205+00:00","Value": "test_string","Unit": null,"NormalizedQuality": "good","HighValue": null,"LowValue": null,"TargetValue": null}]}' + honeywell_json_data = '{"SystemTimeSeries": {"Id": "testId","TenantId": "testTenantId","IdType": "calculatedpoint","Samples": [{"ItemName": "test.item1", "Time": "2023-07-31T06:53:00+00:00","Value": "5.0","Unit": null,"NormalizedQuality": "good", "HighValue": null,"LowValue": null,"TargetValue": null},{"ItemName": "test_item2","Time": "2023-07-31T06:53:00+00:00","Value": 0.0,"Unit": null,"NormalizedQuality": "good","HighValue": null,"LowValue": null,"TargetValue": null},{"ItemName": "testItem3","Time": "2023-07-31T06:53:00.205+00:00","Value": "test_string","Unit": null,"NormalizedQuality": "good","HighValue": null,"LowValue": null,"TargetValue": null}]}}' honeywell_df: DataFrame = spark_session.createDataFrame( [{"body": honeywell_json_data}] )