Skip to content

Commit

Permalink
HW APM schema change (rtdip#505)
Browse files Browse the repository at this point in the history
* typo change

Signed-off-by: JamesKnBr <[email protected]>

* Add Honeywell transformers

Signed-off-by: JamesKnBr <[email protected]>

* add eventhub destination and pcdm-honeywell

Signed-off-by: JamesKnBr <[email protected]>

* Change HW APM schema

Signed-off-by: JamesKnBr <[email protected]>

---------

Signed-off-by: JamesKnBr <[email protected]>
Signed-off-by: JamesKnBr <[email protected]>
  • Loading branch information
JamesKnBr authored Sep 25, 2023
1 parent 998364d commit 3f5ae1a
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 20 deletions.
46 changes: 28 additions & 18 deletions src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,27 +252,37 @@ def get_dbutils(

APM_SCHEMA = StructType(
[
StructField("Id", StringType(), True),
StructField("TenantId", StringType(), True),
StructField("IdType", StringType(), True),
StructField(
"Samples",
ArrayType(
StructType(
[
StructField("ItemName", StringType(), True),
StructField("Time", StringType(), True),
StructField("Value", StringType(), True),
StructField("Unit", StringType(), True),
StructField("NormalizedQuality", StringType(), True),
StructField("HighValue", DoubleType(), True),
StructField("LowValue", DoubleType(), True),
StructField("TargetValue", DoubleType(), True),
]
)
"SystemTimeSeries",
StructType(
[
StructField("Id", StringType(), True),
StructField("TenantId", StringType(), True),
StructField("IdType", StringType(), True),
StructField(
"Samples",
ArrayType(
StructType(
[
StructField("ItemName", StringType(), True),
StructField("Time", StringType(), True),
StructField("Value", StringType(), True),
StructField("Unit", StringType(), True),
StructField(
"NormalizedQuality", StringType(), True
),
StructField("HighValue", DoubleType(), True),
StructField("LowValue", DoubleType(), True),
StructField("TargetValue", DoubleType(), True),
]
)
),
True,
),
]
),
True,
),
)
]
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def transform(self) -> DataFrame:
"""
df = (
self.data.withColumn("body", from_json(self.source_column_name, APM_SCHEMA))
.select(explode("body.Samples"))
.select(explode("body.SystemTimeSeries.Samples"))
.selectExpr("*", "to_timestamp(col.Time) as EventTime")
.withColumn("TagName", col("col.Itemname"))
.withColumn("Status", lit(self.status_null_value))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@


def test_honeywell_apm_to_pcdm(spark_session: SparkSession):
honeywell_json_data = '{"Id": "testId","TenantId": "testTenantId","IdType": "calculatedpoint","Samples": [{"ItemName": "test.item1", "Time": "2023-07-31T06:53:00+00:00","Value": "5.0","Unit": null,"NormalizedQuality": "good", "HighValue": null,"LowValue": null,"TargetValue": null},{"ItemName": "test_item2","Time": "2023-07-31T06:53:00+00:00","Value": 0.0,"Unit": null,"NormalizedQuality": "good","HighValue": null,"LowValue": null,"TargetValue": null},{"ItemName": "testItem3","Time": "2023-07-31T06:53:00.205+00:00","Value": "test_string","Unit": null,"NormalizedQuality": "good","HighValue": null,"LowValue": null,"TargetValue": null}]}'
honeywell_json_data = '{"SystemTimeSeries": {"Id": "testId","TenantId": "testTenantId","IdType": "calculatedpoint","Samples": [{"ItemName": "test.item1", "Time": "2023-07-31T06:53:00+00:00","Value": "5.0","Unit": null,"NormalizedQuality": "good", "HighValue": null,"LowValue": null,"TargetValue": null},{"ItemName": "test_item2","Time": "2023-07-31T06:53:00+00:00","Value": 0.0,"Unit": null,"NormalizedQuality": "good","HighValue": null,"LowValue": null,"TargetValue": null},{"ItemName": "testItem3","Time": "2023-07-31T06:53:00.205+00:00","Value": "test_string","Unit": null,"NormalizedQuality": "good","HighValue": null,"LowValue": null,"TargetValue": null}]}}'
honeywell_df: DataFrame = spark_session.createDataFrame(
[{"body": honeywell_json_data}]
)
Expand Down

0 comments on commit 3f5ae1a

Please sign in to comment.