From 3e78b7b7f0648a939f85bcb13122f40844c1f520 Mon Sep 17 00:00:00 2001 From: cching95 <73163191+cching95@users.noreply.github.com> Date: Thu, 12 Sep 2024 10:23:53 +0100 Subject: [PATCH] Add UoM to TimeseriesQueryBuilder (#824) * add uom to timeseries query builder Signed-off-by: Chloe Ching * update time weighted average query to use new metadata source Signed-off-by: Chloe Ching * updated queries to parametertise tagname column Signed-off-by: Chloe Ching --------- Signed-off-by: Chloe Ching --- .../time_series/_time_series_query_builder.py | 129 ++++++++++-- .../queries/time_series/interpolate.py | 1 - .../time_series/time_series_query_builder.py | 187 +++++++++++++++++- 3 files changed, 290 insertions(+), 27 deletions(-) diff --git a/src/sdk/python/rtdip_sdk/queries/time_series/_time_series_query_builder.py b/src/sdk/python/rtdip_sdk/queries/time_series/_time_series_query_builder.py index dad49a2c4..085941859 100644 --- a/src/sdk/python/rtdip_sdk/queries/time_series/_time_series_query_builder.py +++ b/src/sdk/python/rtdip_sdk/queries/time_series/_time_series_query_builder.py @@ -47,7 +47,12 @@ def _raw_query(parameters_dict: dict) -> str: ") " "{% if display_uom is defined and display_uom == true %}" 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(e.`EventTime`, e.`TagName`, e.`Status`, e.`Value`, m.`UOM`), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}e.`EventTime`, e.`TagName`, e.`Status`, e.`Value`, m.`UOM`{% endif %} FROM raw_events e ' - "LEFT OUTER JOIN `{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON e.`TagName` = m.`TagName` " + "LEFT OUTER JOIN " + "{% if metadata_source is defined and metadata_source is not none %}" + "`{{ metadata_source|lower }}` m ON e.`{{ tagname_column }}` = m.`{{ metadata_tagname_column }}` " + "{% else %}" + "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON e.`{{ tagname_column }}` = m.`{{ tagname_column }}` " + "{% endif %}" "{% else %}" 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(*), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}*{% endif %} FROM raw_events ' "{% endif %}" @@ -61,6 +66,7 @@ def _raw_query(parameters_dict: dict) -> str: raw_parameters = { "source": parameters_dict.get("source", None), + "metadata_source": parameters_dict.get("metadata_source", None), "business_unit": parameters_dict.get("business_unit"), "region": parameters_dict.get("region"), "asset": parameters_dict.get("asset"), @@ -92,6 +98,10 @@ def _raw_query(parameters_dict: dict) -> str: "case_insensitivity_tag_search": parameters_dict.get( "case_insensitivity_tag_search", False ), + "metadata_tagname_column": parameters_dict.get( + "metadata_tagname_column", "TagName" + ), + "metadata_uom_column": parameters_dict.get("metadata_uom_column", "UoM"), "to_json": parameters_dict.get("to_json", False), } @@ -165,7 +175,12 @@ def _sample_query(parameters_dict: dict) -> tuple: "{% else %}" "{% if display_uom is defined and display_uom == true %}" 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM`), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM`{% endif %} FROM project p ' - "LEFT OUTER JOIN `{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON p.`TagName` = m.`TagName` " + "LEFT OUTER JOIN " + "{% if metadata_source is defined and metadata_source is not none %}" + "`{{ metadata_source|lower }}` m ON p.`{{ tagname_column }}` = m.`{{ metadata_tagname_column }}` " + "{% else %}" + "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON p.`{{ tagname_column }}` = m.`{{ tagname_column }}` " + "{% endif %}" "{% else %}" 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(*), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}*{% endif %} FROM project ' "{% endif %}" @@ -180,6 +195,7 @@ def _sample_query(parameters_dict: dict) -> tuple: sample_parameters = { "source": parameters_dict.get("source", None), + "metadata_source": parameters_dict.get("metadata_source", None), "business_unit": parameters_dict.get("business_unit"), "region": parameters_dict.get("region"), "asset": parameters_dict.get("asset"), @@ -217,6 +233,10 @@ def _sample_query(parameters_dict: dict) -> tuple: "case_insensitivity_tag_search", False ), "display_uom": parameters_dict.get("display_uom", False), + "metadata_tagname_column": parameters_dict.get( + "metadata_tagname_column", "TagName" + ), + "metadata_uom_column": parameters_dict.get("metadata_uom_column", "UoM"), "to_json": parameters_dict.get("to_json", False), } @@ -270,7 +290,12 @@ def _plot_query(parameters_dict: dict) -> tuple: "{% else %}" "{% if display_uom is defined and display_uom == true %}" 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM`), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM`{% endif %} FROM project p ' - "LEFT OUTER JOIN `{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON p.`TagName` = m.`TagName` " + "LEFT OUTER JOIN " + "{% if metadata_source is defined and metadata_source is not none %}" + "`{{ metadata_source|lower }}` m ON p.`{{ tagname_column }}` = m.`{{ metadata_tagname_column }}` " + "{% else %}" + "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON p.`{{ tagname_column }}` = m.`{{ tagname_column }}` " + "{% endif %}" "{% else %}" 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(*), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}*{% endif %} FROM project ' "{% endif %}" @@ -285,6 +310,7 @@ def _plot_query(parameters_dict: dict) -> tuple: plot_parameters = { "source": parameters_dict.get("source", None), + "metadata_source": parameters_dict.get("metadata_source", None), "business_unit": parameters_dict.get("business_unit"), "region": parameters_dict.get("region"), "asset": parameters_dict.get("asset"), @@ -321,6 +347,10 @@ def _plot_query(parameters_dict: dict) -> tuple: "case_insensitivity_tag_search": parameters_dict.get( "case_insensitivity_tag_search", False ), + "metadata_tagname_column": parameters_dict.get( + "metadata_tagname_column", "TagName" + ), + "metadata_uom_column": parameters_dict.get("metadata_uom_column", "UoM"), "to_json": parameters_dict.get("to_json", False), } @@ -378,7 +408,12 @@ def _interpolation_query( "{% else %}" "{% if display_uom is defined and display_uom == true %}" 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM`), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM`{% endif %} FROM project p ' - "LEFT OUTER JOIN `{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON p.`TagName` = m.`TagName` ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " + "LEFT OUTER JOIN " + "{% if metadata_source is defined and metadata_source is not none %}" + "`{{ metadata_source|lower }}` m ON p.`{{ tagname_column }}` = m.`{{ metadata_tagname_column }}` ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " + "{% else %}" + "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON p.`{{ tagname_column }}` = m.`{{ tagname_column }}` ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " + "{% endif %}" "{% else%}" 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(*), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}*{% endif %} FROM project ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` ' "{% endif %}" @@ -470,7 +505,12 @@ def _interpolation_at_time(parameters_dict: dict) -> str: "{% else %}" "{% if display_uom is defined and display_uom == true %}" 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM`), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM`{% endif %} FROM project p ' - "LEFT OUTER JOIN `{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON p.`TagName` = m.`TagName` ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " + "LEFT OUTER JOIN " + "{% if metadata_source is defined and metadata_source is not none %}" + "`{{ metadata_source|lower }}` m ON p.`{{ tagname_column }}` = m.`{{ metadata_tagname_column }}` ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " + "{% else %}" + "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON p.`{{ tagname_column }}` = m.`{{ tagname_column }}` ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " + "{% endif %}" "{% else%}" 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(*), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}*{% endif %} FROM project ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` ' "{% endif %}" @@ -485,6 +525,7 @@ def _interpolation_at_time(parameters_dict: dict) -> str: interpolation_at_time_parameters = { "source": parameters_dict.get("source", None), + "metadata_source": parameters_dict.get("metadata_source", None), "business_unit": parameters_dict.get("business_unit"), "region": parameters_dict.get("region"), "asset": parameters_dict.get("asset"), @@ -519,6 +560,10 @@ def _interpolation_at_time(parameters_dict: dict) -> str: "case_insensitivity_tag_search": parameters_dict.get( "case_insensitivity_tag_search", False ), + "metadata_tagname_column": parameters_dict.get( + "metadata_tagname_column", "TagName" + ), + "metadata_uom_column": parameters_dict.get("metadata_uom_column", "UoM"), "to_json": parameters_dict.get("to_json", False), } sql_template = Template(interpolate_at_time_query) @@ -587,7 +632,12 @@ def _latest_query(parameters_dict: dict) -> str: "ORDER BY `{{ tagname_column }}` ) " "{% if display_uom is defined and display_uom == true %}" 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(l.*, m.`UoM), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}l.*, m.`UoM`{% endif %} FROM latest l ' - "LEFT OUTER JOIN `{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON l.`TagName` = m.`TagName` " + "LEFT OUTER JOIN " + "{% if metadata_source is defined and metadata_source is not none %}" + "`{{ metadata_source|lower }}` m ON l.`{{ tagname_column }}` = m.`{{ metadata_tagname_column }}` " + "{% else %}" + "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON l.`{{ tagname_column }}` = m.`{{ tagname_column }}` " + "{% endif %}" "{% else %}" 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(*), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}*{% endif %} FROM latest ' "{% endif %}" @@ -601,6 +651,7 @@ def _latest_query(parameters_dict: dict) -> str: latest_parameters = { "source": parameters_dict.get("source", None), + "metadata_source": parameters_dict.get("metadata_source", None), "business_unit": parameters_dict.get("business_unit"), "region": parameters_dict.get("region"), "asset": parameters_dict.get("asset"), @@ -613,6 +664,10 @@ def _latest_query(parameters_dict: dict) -> str: "case_insensitivity_tag_search": parameters_dict.get( "case_insensitivity_tag_search", False ), + "metadata_tagname_column": parameters_dict.get( + "metadata_tagname_column", "TagName" + ), + "metadata_uom_column": parameters_dict.get("metadata_uom_column", "UoM"), "to_json": parameters_dict.get("to_json", False), } @@ -652,13 +707,20 @@ def _time_weighted_average_query(parameters_dict: dict) -> str: ',fill_status AS (SELECT *, last_value(`{{ status_column }}`, true) OVER (PARTITION BY `{{ tagname_column }}` ORDER BY `{{ timestamp_column }}` ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `Fill_{{ status_column }}`, CASE WHEN `Fill_{{ status_column }}` <> "Bad" THEN `{{ value_column }}` ELSE null END AS `Good_{{ value_column }}` FROM window_events) ' ",fill_value AS (SELECT *, last_value(`Good_{{ value_column }}`, true) OVER (PARTITION BY `{{ tagname_column }}` ORDER BY `{{ timestamp_column }}` ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `Fill_{{ value_column }}` FROM fill_status) " '{% if step is defined and step == "metadata" %} ' - ",fill_step AS (SELECT *, IFNULL(Step, false) AS Step FROM fill_value f LEFT JOIN " - "{% if source_metadata is defined and source_metadata is not none %}" - "`{{ source_metadata|lower }}` " + ",fill_step AS (SELECT *, IFNULL(Step, false) AS Step FROM fill_value f " + "LEFT JOIN " + "{% if metadata_source is defined and metadata_source is not none %}" + "`{{ metadata_source|lower }}` m ON f.`{{ tagname_column }}` = m.`{{ metadata_tagname_column }}`) " "{% else %}" - "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` " - "{% endif %}" - "m ON f.`{{ tagname_column }}` = m.`{{ tagname_column }}`) " + "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON f.`{{ tagname_column }}` = m.`{{ tagname_column }}`) " + "{% endif %}" + # "LEFT JOIN " + # "{% if source_metadata is defined and source_metadata is not none %}" + # "`{{ source_metadata|lower }}` " + # "{% else %}" + # "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` " + # "{% endif %}" + # "m ON f.`{{ tagname_column }}` = m.`{{ tagname_column }}`) " "{% else %}" ",fill_step AS (SELECT *, {{ step }} AS Step FROM fill_value) " "{% endif %}" @@ -687,7 +749,12 @@ def _time_weighted_average_query(parameters_dict: dict) -> str: "{% else %}" "{% if display_uom is defined and display_uom == true %}" 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM`), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM`{% endif %} FROM project p ' - "LEFT OUTER JOIN `{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON p.`TagName` = m.`TagName` ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " + "LEFT OUTER JOIN " + "{% if metadata_source is defined and metadata_source is not none %}" + "`{{ metadata_source|lower }}` m ON p.`{{ tagname_column }}` = m.`{{ metadata_tagname_column }}` ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " + "{% else %}" + "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON p.`{{ tagname_column }}` = m.`{{ tagname_column }}` ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " + "{% endif %}" "{% else%}" 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(*), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}*{% endif %} FROM project ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` ' "{% endif %}" @@ -702,6 +769,7 @@ def _time_weighted_average_query(parameters_dict: dict) -> str: time_weighted_average_parameters = { "source": parameters_dict.get("source", None), + "metadata_source": parameters_dict.get("metadata_source", None), "source_metadata": parameters_dict.get("source_metadata", None), "business_unit": parameters_dict.get("business_unit"), "region": parameters_dict.get("region"), @@ -742,6 +810,10 @@ def _time_weighted_average_query(parameters_dict: dict) -> str: "case_insensitivity_tag_search": parameters_dict.get( "case_insensitivity_tag_search", False ), + "metadata_tagname_column": parameters_dict.get( + "metadata_tagname_column", "TagName" + ), + "metadata_uom_column": parameters_dict.get("metadata_uom_column", "UoM"), "to_json": parameters_dict.get("to_json", False), } @@ -794,7 +866,12 @@ def _circular_stats_query(parameters_dict: dict) -> str: "{% else %}" "{% if display_uom is defined and display_uom == true %}" 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(p.*, m.`UoM`), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}p.*, m.`UoM`{% endif %} FROM project p ' - "LEFT OUTER JOIN `{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON p.`TagName` = m.`TagName` ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " + "LEFT OUTER JOIN " + "{% if metadata_source is defined and metadata_source is not none %}" + "`{{ metadata_source|lower }}` m ON p.`{{ tagname_column }}` = m.`{{ metadata_tagname_column }}` ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " + "{% else %}" + "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON p.`{{ tagname_column }}` = m.`{{ tagname_column }}` ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " + "{% endif %}" "{% else%}" 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(*), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}*{% endif %} FROM project ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` ' "{% endif %}" @@ -827,7 +904,12 @@ def _circular_stats_query(parameters_dict: dict) -> str: "{% else %}" "{% if display_uom is defined and display_uom == true %}" 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(p.*, m.`UoM`), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}p.*, m.`UoM`{% endif %} FROM project p ' - "LEFT OUTER JOIN `{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON p.`TagName` = m.`TagName` ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " + "LEFT OUTER JOIN " + "{% if metadata_source is defined and metadata_source is not none %}" + "`{{ metadata_source|lower }}` m ON p.`{{ tagname_column }}` = m.`{{ metadata_tagname_column }}` ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " + "{% else %}" + "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON p.`{{ tagname_column }}` = m.`{{ tagname_column }}` ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " + "{% endif %}" "{% else%}" 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(*), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}*{% endif %} FROM project ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` ' "{% endif %}" @@ -842,6 +924,7 @@ def _circular_stats_query(parameters_dict: dict) -> str: circular_stats_parameters = { "source": parameters_dict.get("source", None), + "metadata_source": parameters_dict.get("metadata_source", None), "business_unit": parameters_dict.get("business_unit"), "region": parameters_dict.get("region"), "asset": parameters_dict.get("asset"), @@ -879,6 +962,10 @@ def _circular_stats_query(parameters_dict: dict) -> str: "case_insensitivity_tag_search": parameters_dict.get( "case_insensitivity_tag_search", False ), + "metadata_tagname_column": parameters_dict.get( + "metadata_tagname_column", "TagName" + ), + "metadata_uom_column": parameters_dict.get("metadata_uom_column", "UoM"), "to_json": parameters_dict.get("to_json", False), } @@ -912,7 +999,12 @@ def _summary_query(parameters_dict: dict) -> str: "GROUP BY `{{ tagname_column }}`) " "{% if display_uom is defined and display_uom == true %}" 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(s.*, m.`UoM`), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}s.*, m.`UoM`{% endif %} FROM summary s ' - "LEFT OUTER JOIN `{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON s.`TagName` = m.`TagName` " + "LEFT OUTER JOIN " + "{% if metadata_source is defined and metadata_source is not none %}" + "`{{ metadata_source|lower }}` m ON s.`{{ tagname_column }}` = m.`{{ metadata_tagname_column }}` " + "{% else %}" + "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON s.`{{ tagname_column }}` = m.`{{ tagname_column }}` " + "{% endif %}" "{% else%}" 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(*), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}*{% endif %} FROM summary ' "{% endif %}" @@ -926,6 +1018,7 @@ def _summary_query(parameters_dict: dict) -> str: summary_parameters = { "source": parameters_dict.get("source", None), + "metadata_source": parameters_dict.get("metadata_source", None), "business_unit": parameters_dict.get("business_unit"), "region": parameters_dict.get("region"), "asset": parameters_dict.get("asset"), @@ -957,6 +1050,10 @@ def _summary_query(parameters_dict: dict) -> str: "case_insensitivity_tag_search": parameters_dict.get( "case_insensitivity_tag_search", False ), + "metadata_tagname_column": parameters_dict.get( + "metadata_tagname_column", "TagName" + ), + "metadata_uom_column": parameters_dict.get("metadata_uom_column", "UoM"), "to_json": parameters_dict.get("to_json", False), } diff --git a/src/sdk/python/rtdip_sdk/queries/time_series/interpolate.py b/src/sdk/python/rtdip_sdk/queries/time_series/interpolate.py index 130d0a0aa..354b74a31 100644 --- a/src/sdk/python/rtdip_sdk/queries/time_series/interpolate.py +++ b/src/sdk/python/rtdip_sdk/queries/time_series/interpolate.py @@ -14,7 +14,6 @@ import logging import pandas as pd -import sys from ._time_series_query_builder import _query_builder diff --git a/src/sdk/python/rtdip_sdk/queries/time_series/time_series_query_builder.py b/src/sdk/python/rtdip_sdk/queries/time_series/time_series_query_builder.py index 7e9fb3944..9109ddaab 100644 --- a/src/sdk/python/rtdip_sdk/queries/time_series/time_series_query_builder.py +++ b/src/sdk/python/rtdip_sdk/queries/time_series/time_series_query_builder.py @@ -43,6 +43,14 @@ class TimeSeriesQueryBuilder: timestamp_column: str status_column: str value_column: str + metadata_source: str + metadata_tagname_column: str + metadata_uom_column: str + + def __init__(self): + self.metadata_source = None + self.metadata_tagname_column = None + self.metadata_uom_column = None def connect(self, connection: ConnectionInterface): """ @@ -116,6 +124,49 @@ def source( self.value_column = value_column return self + def m_source( + self, + metadata_source: str, + metadata_tagname_column: str = "TagName", + metadata_uom_column: str = "UoM", + ): + """ + Specifies the Metadata source of the query. This is only required if display_uom is set to True or Step is set to "metadata". Otherwise, it is optional. + + **Example:** + ```python + from rtdip_sdk.authentication.azure import DefaultAuth + from rtdip_sdk.connectors import DatabricksSQLConnection + from rtdip_sdk.queries import TimeSeriesQueryBuilder + + auth = DefaultAuth().authenticate() + token = auth.get_token("2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default").token + connection = DatabricksSQLConnection("{server_hostname}", "{http_path}", token) + + source = ( + TimeSeriesQueryBuilder() + .connect(connection) + .source( + source="{tablename_or_path}" + ) + .m_source( + metadata_source="{metadata_table_or_path}" + metadata_tagname_column="TagName", + metadata_uom_column="UoM") + ) + + ``` + + Args: + metadata_source (str): Source of the query can be a Unity Catalog table, Hive metastore table or path + metadata_tagname_column (optional str): The column name in the source that contains the tagnames or series + metadata_uom_column (optional str): The column name in the source that contains the unit of measure + """ + self.metadata_source = "`.`".join(metadata_source.split(".")) + self.metadata_tagname_column = metadata_tagname_column + self.metadata_uom_column = metadata_uom_column + return self + def raw( self, tagname_filter: [str], @@ -159,7 +210,7 @@ def raw( start_date (str): Start date (Either a date in the format YY-MM-DD or a datetime in the format YYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz) end_date (str): End date (Either a date in the format YY-MM-DD or a datetime in the format YYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz) include_bad_data (optional bool): Include "Bad" data points with True or remove "Bad" data points with False - display_uom (optional bool): Display the unit of measure with True or False. Defaults to False + display_uom (optional bool): Display the unit of measure with True or False. Defaults to False. If True, metadata_source must be populated limit (optional int): The number of rows to be returned offset (optional int): The number of rows to skip before returning rows @@ -168,6 +219,7 @@ def raw( """ raw_parameters = { "source": self.data_source, + "metadata_source": self.metadata_source, "tag_names": tagname_filter, "start_date": start_date, "end_date": end_date, @@ -179,8 +231,17 @@ def raw( "timestamp_column": self.timestamp_column, "status_column": self.status_column, "value_column": self.value_column, + "metadata_tagname_column": self.metadata_tagname_column, + "metadata_uom_column": self.metadata_uom_column, "supress_warning": True, } + + if "display_uom" in raw_parameters and raw_parameters["display_uom"] is True: + if raw_parameters["metadata_source"] is None: + raise ValueError( + "display_uom True requires metadata_source to be populated" + ) + return raw.get(self.connection, raw_parameters) def resample( @@ -237,7 +298,7 @@ def resample( agg_method (str): Aggregation Method (first, last, avg, min, max) include_bad_data (optional bool): Include "Bad" data points with True or remove "Bad" data points with False pivot (optional bool): Pivot the data on the timestamp column with True or do not pivot the data with False - display_uom (optional bool): Display the unit of measure with True or False. Defaults to False + display_uom (optional bool): Display the unit of measure with True or False. Defaults to False. If True, metadata_source must be populated limit (optional int): The number of rows to be returned offset (optional int): The number of rows to skip before returning rows @@ -247,6 +308,7 @@ def resample( resample_parameters = { "source": self.data_source, + "metadata_source": self.metadata_source, "tag_names": tagname_filter, "start_date": start_date, "end_date": end_date, @@ -262,9 +324,20 @@ def resample( "timestamp_column": self.timestamp_column, "status_column": self.status_column, "value_column": self.value_column, + "metadata_tagname_column": self.metadata_tagname_column, + "metadata_uom_column": self.metadata_uom_column, "supress_warning": True, } + if ( + "display_uom" in resample_parameters + and resample_parameters["display_uom"] is True + ): + if resample_parameters["metadata_source"] is None: + raise ValueError( + "display_uom True requires metadata_source to be populated" + ) + return resample.get(self.connection, resample_parameters) def plot( @@ -318,7 +391,7 @@ def plot( time_interval_unit (str): The time interval unit (second, minute, day, hour) include_bad_data (optional bool): Include "Bad" data points with True or remove "Bad" data points with False pivot (optional bool): Pivot the data on the timestamp column with True or do not pivot the data with False - display_uom (optional bool): Display the unit of measure with True or False. Defaults to False + display_uom (optional bool): Display the unit of measure with True or False. Defaults to False. If True, metadata_source must be populated limit (optional int): The number of rows to be returned offset (optional int): The number of rows to skip before returning rows @@ -328,6 +401,7 @@ def plot( plot_parameters = { "source": self.data_source, + "metadata_source": self.metadata_source, "tag_names": tagname_filter, "start_date": start_date, "end_date": end_date, @@ -342,9 +416,17 @@ def plot( "timestamp_column": self.timestamp_column, "status_column": self.status_column, "value_column": self.value_column, + "metadata_tagname_column": self.metadata_tagname_column, + "metadata_uom_column": self.metadata_uom_column, "supress_warning": True, } + if "display_uom" in plot_parameters and plot_parameters["display_uom"] is True: + if plot_parameters["metadata_source"] is None: + raise ValueError( + "display_uom True requires metadata_source to be populated" + ) + return plot.get(self.connection, plot_parameters) def interpolate( @@ -404,7 +486,7 @@ def interpolate( interpolation_method (str): Interpolation method (forward_fill, backward_fill, linear) include_bad_data (optional bool): Include "Bad" data points with True or remove "Bad" data points with False pivot (optional bool): Pivot the data on the timestamp column with True or do not pivot the data with False - display_uom (optional bool): Display the unit of measure with True or False. Defaults to False + display_uom (optional bool): Display the unit of measure with True or False. Defaults to False. If True, metadata_source must be populated limit (optional int): The number of rows to be returned offset (optional int): The number of rows to skip before returning rows @@ -413,6 +495,7 @@ def interpolate( """ interpolation_parameters = { "source": self.data_source, + "metadata_source": self.metadata_source, "tag_names": tagname_filter, "start_date": start_date, "end_date": end_date, @@ -429,9 +512,20 @@ def interpolate( "timestamp_column": self.timestamp_column, "status_column": self.status_column, "value_column": self.value_column, + "metadata_tagname_column": self.metadata_tagname_column, + "metadata_uom_column": self.metadata_uom_column, "supress_warning": True, } + if ( + "display_uom" in interpolation_parameters + and interpolation_parameters["display_uom"] is True + ): + if interpolation_parameters["metadata_source"] is None: + raise ValueError( + "display_uom True requires metadata_source to be populated" + ) + return interpolate.get(self.connection, interpolation_parameters) def interpolation_at_time( @@ -478,7 +572,7 @@ def interpolation_at_time( include_bad_data (optional bool): Include "Bad" data points with True or remove "Bad" data points with False window_length (optional int): Add longer window time in days for the start or end of specified date to cater for edge cases pivot (optional bool): Pivot the data on the timestamp column with True or do not pivot the data with False - display_uom (optional bool): Display the unit of measure with True or False. Defaults to False + display_uom (optional bool): Display the unit of measure with True or False. Defaults to False. If True, metadata_source must be populated limit (optional int): The number of rows to be returned offset (optional int): The number of rows to skip before returning rows @@ -487,6 +581,7 @@ def interpolation_at_time( """ interpolation_at_time_parameters = { "source": self.data_source, + "metadata_source": self.metadata_source, "tag_names": tagname_filter, "timestamps": timestamp_filter, "include_bad_data": include_bad_data, @@ -499,9 +594,20 @@ def interpolation_at_time( "timestamp_column": self.timestamp_column, "status_column": self.status_column, "value_column": self.value_column, + "metadata_tagname_column": self.metadata_tagname_column, + "metadata_uom_column": self.metadata_uom_column, "supress_warning": True, } + if ( + "display_uom" in interpolation_at_time_parameters + and interpolation_at_time_parameters["display_uom"] is True + ): + if interpolation_at_time_parameters["metadata_source"] is None: + raise ValueError( + "display_uom True requires metadata_source to be populated" + ) + return interpolation_at_time.get( self.connection, interpolation_at_time_parameters ) @@ -564,7 +670,7 @@ def time_weighted_average( include_bad_data (optional bool): Include "Bad" data points with True or remove "Bad" data points with False window_length (optional int): Add longer window time in days for the start or end of specified date to cater for edge cases pivot (optional bool): Pivot the data on the timestamp column with True or do not pivot the data with False - display_uom (optional bool): Display the unit of measure with True or False. Defaults to False + display_uom (optional bool): Display the unit of measure with True or False. Defaults to False. If True, metadata_source must be populated limit (optional int): The number of rows to be returned offset (optional int): The number of rows to skip before returning rows @@ -573,6 +679,7 @@ def time_weighted_average( """ time_weighted_average_parameters = { "source": self.data_source, + "metadata_source": self.metadata_source, "tag_names": tagname_filter, "start_date": start_date, "end_date": end_date, @@ -594,9 +701,20 @@ def time_weighted_average( "timestamp_column": self.timestamp_column, "status_column": self.status_column, "value_column": self.value_column, + "metadata_tagname_column": self.metadata_tagname_column, + "metadata_uom_column": self.metadata_uom_column, "supress_warning": True, } + if ( + "display_uom" in time_weighted_average_parameters + and time_weighted_average_parameters["display_uom"] is True + ): + if time_weighted_average_parameters["metadata_source"] is None: + raise ValueError( + "display_uom True requires metadata_source to be populated" + ) + return time_weighted_average.get( self.connection, time_weighted_average_parameters ) @@ -687,7 +805,7 @@ def latest( Args: tagname_filter (list str): List of tagnames to filter on the source - display_uom (optional bool): Display the unit of measure with True or False. Defaults to False + display_uom (optional bool): Display the unit of measure with True or False. Defaults to False. If True, metadata_source must be populated limit (optional int): The number of rows to be returned offset (optional int): The number of rows to skip before returning rows @@ -696,14 +814,26 @@ def latest( """ latest_parameters = { "source": self.data_source, + "metadata_source": self.metadata_source, "tag_names": [] if tagname_filter is None else tagname_filter, "tagname_column": self.tagname_column, "display_uom": display_uom, "limit": limit, "offset": offset, + "metadata_tagname_column": self.metadata_tagname_column, + "metadata_uom_column": self.metadata_uom_column, "supress_warning": True, } + if ( + "display_uom" in latest_parameters + and latest_parameters["display_uom"] is True + ): + if latest_parameters["metadata_source"] is None: + raise ValueError( + "display_uom True requires metadata_source to be populated" + ) + return latest.get(self.connection, latest_parameters) def circular_average( @@ -763,7 +893,7 @@ def circular_average( upper_bound (int): Upper boundary for the sample range include_bad_data (optional bool): Include "Bad" data points with True or remove "Bad" data points with False pivot (optional bool): Pivot the data on the timestamp column with True or do not pivot the data with False - display_uom (optional bool): Display the unit of measure with True or False. Defaults to False + display_uom (optional bool): Display the unit of measure with True or False. Defaults to False. If True, metadata_source must be populated limit (optional int): The number of rows to be returned offset (optional int): The number of rows to skip before returning rows @@ -772,6 +902,7 @@ def circular_average( """ circular_average_parameters = { "source": self.data_source, + "metadata_source": self.metadata_source, "tag_names": tagname_filter, "start_date": start_date, "end_date": end_date, @@ -788,9 +919,20 @@ def circular_average( "timestamp_column": self.timestamp_column, "status_column": self.status_column, "value_column": self.value_column, + "metadata_tagname_column": self.metadata_tagname_column, + "metadata_uom_column": self.metadata_uom_column, "supress_warning": True, } + if ( + "display_uom" in circular_average_parameters + and circular_average_parameters["display_uom"] is True + ): + if circular_average_parameters["metadata_source"] is None: + raise ValueError( + "display_uom True requires metadata_source to be populated" + ) + return circular_average.get(self.connection, circular_average_parameters) def circular_standard_deviation( @@ -850,7 +992,7 @@ def circular_standard_deviation( upper_bound (int): Upper boundary for the sample range include_bad_data (optional bool): Include "Bad" data points with True or remove "Bad" data points with False pivot (optional bool): Pivot the data on the timestamp column with True or do not pivot the data with False - display_uom (optional bool): Display the unit of measure with True or False. Defaults to False + display_uom (optional bool): Display the unit of measure with True or False. Defaults to False. If True, metadata_source must be populated limit (optional int): The number of rows to be returned offset (optional int): The number of rows to skip before returning rows @@ -859,6 +1001,7 @@ def circular_standard_deviation( """ circular_stdev_parameters = { "source": self.data_source, + "metadata_source": self.metadata_source, "tag_names": tagname_filter, "start_date": start_date, "end_date": end_date, @@ -875,9 +1018,20 @@ def circular_standard_deviation( "timestamp_column": self.timestamp_column, "status_column": self.status_column, "value_column": self.value_column, + "metadata_tagname_column": self.metadata_tagname_column, + "metadata_uom_column": self.metadata_uom_column, "supress_warning": True, } + if ( + "display_uom" in circular_stdev_parameters + and circular_stdev_parameters["display_uom"] is True + ): + if circular_stdev_parameters["metadata_source"] is None: + raise ValueError( + "display_uom True requires metadata_source to be populated" + ) + return circular_standard_deviation.get( self.connection, circular_stdev_parameters ) @@ -925,7 +1079,7 @@ def summary( start_date (str): Start date (Either a date in the format YY-MM-DD or a datetime in the format YYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz) end_date (str): End date (Either a date in the format YY-MM-DD or a datetime in the format YYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz) include_bad_data (optional bool): Include "Bad" data points with True or remove "Bad" data points with False - display_uom (optional bool): Display the unit of measure with True or False. Does not apply to pivoted tables. Defaults to False + display_uom (optional bool): Display the unit of measure with True or False. Defaults to False. If True, metadata_source must be populated limit (optional int): The number of rows to be returned offset (optional int): The number of rows to skip before returning rows @@ -934,6 +1088,7 @@ def summary( """ summary_parameters = { "source": self.data_source, + "metadata_source": self.metadata_source, "tag_names": tagname_filter, "start_date": start_date, "end_date": end_date, @@ -945,6 +1100,18 @@ def summary( "timestamp_column": self.timestamp_column, "status_column": self.status_column, "value_column": self.value_column, + "metadata_tagname_column": self.metadata_tagname_column, + "metadata_uom_column": self.metadata_uom_column, "supress_warning": True, } + + if ( + "display_uom" in summary_parameters + and summary_parameters["display_uom"] is True + ): + if summary_parameters["metadata_source"] is None: + raise ValueError( + "display_uom True requires metadata_source to be populated" + ) + return summary.get(self.connection, summary_parameters)