diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 500ad212e..aba251bcd 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -27,7 +27,7 @@ jobs: matrix: os: [ubuntu-latest] python-version: ["3.9", "3.10", "3.11", "3.12"] - pyspark: ["3.3.0", "3.3.1", "3.3.2", "3.4.0", "3.4.1", "3.5.0", "3.5.1"] + pyspark: ["3.3.0", "3.3.1", "3.3.2", "3.4.0", "3.4.1", "3.5.0", "3.5.1"] # 3.5.2 does not work with conda exclude: - pyspark: "3.5.1" python-version: "3.9" diff --git a/environment.yml b/environment.yml index 53682960a..f0a7917bf 100644 --- a/environment.yml +++ b/environment.yml @@ -20,7 +20,7 @@ channels: dependencies: - python>=3.9,<3.13 - importlib-metadata>=7.0.0 - - jinja2>=3.1.4 + - jinja2>=3.1.5 - pytest==7.4.0 - pytest-mock==3.11.1 - pytest-cov==4.1.0 @@ -36,7 +36,7 @@ dependencies: - azure-mgmt-storage>=21.0.0 - boto3>=1.28.2,<2.0.0 - pyodbc>=4.0.39,<5.3.0 - - fastapi>=0.110.0,<1.0.0 + - fastapi>=0.115.6,<1.0.0 - httpx>=0.24.1,<1.0.0 - pyspark>=3.3.0,<3.6.0 - delta-spark>=2.2.0,<3.3.0 diff --git a/setup.py b/setup.py index 1f3892e12..71ab266b1 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ "pyarrow>=14.0.1,<17.0.0", "azure-identity>=1.12.0,<2.0.0", "pandas>=1.5.2,<2.2.0", - "jinja2>=3.1.2,<4.0.0", + "jinja2>=3.1.5,<4.0.0", "importlib_metadata>=7.0.0,<8.0.0", "semver>=3.0.0,<4.0.0", "xlrd>=2.0.1,<3.0.0", diff --git a/src/api/requirements.txt b/src/api/requirements.txt index 9e30382de..1e455e720 100644 --- a/src/api/requirements.txt +++ b/src/api/requirements.txt @@ -1,6 +1,6 @@ # Do not include azure-functions-worker as it may conflict with the Azure Functions platform azure-functions==1.18.0 -fastapi==0.110.0 +fastapi==0.115.6 pydantic==2.6.0 # turbodbc==4.11.0 pyodbc==4.0.39 @@ -10,7 +10,7 @@ azure-identity==1.17.0 oauthlib>=3.2.2 pandas>=2.0.1,<3.0.0 numpy==1.26.4 -jinja2==3.1.4 +jinja2==3.1.5 pytz==2024.1 semver==3.0.2 xlrd==2.0.1 diff --git a/src/sdk/python/rtdip_sdk/queries/time_series/_time_series_query_builder.py b/src/sdk/python/rtdip_sdk/queries/time_series/_time_series_query_builder.py index e89c7b07c..a5a31ca2f 100644 --- a/src/sdk/python/rtdip_sdk/queries/time_series/_time_series_query_builder.py +++ b/src/sdk/python/rtdip_sdk/queries/time_series/_time_series_query_builder.py @@ -27,43 +27,379 @@ seconds_per_unit = {"s": 1, "m": 60, "h": 3600, "d": 86400, "w": 604800} -def _raw_query(parameters_dict: dict) -> str: - raw_query = ( - 'WITH raw_events AS (SELECT DISTINCT from_utc_timestamp(date_trunc("millisecond",`{{ timestamp_column }}`), "{{ time_zone }}") AS `{{ timestamp_column }}`, `{{ tagname_column }}`, {% if include_status is defined and include_status == true %} `{{ status_column }}`, {% endif %} `{{ value_column }}` FROM ' - "{% if source is defined and source is not none %}" - "`{{ source|lower }}` " - "{% else %}" - "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_events_{{ data_type|lower }}` " - "{% endif %}" - "{% if case_insensitivity_tag_search is defined and case_insensitivity_tag_search == true %}" - "WHERE `{{ timestamp_column }}` BETWEEN to_timestamp(\"{{ start_date }}\") AND to_timestamp(\"{{ end_date }}\") AND UPPER(`{{ tagname_column }}`) IN ('{{ tag_names | join('\\', \\'') | upper }}') " - "{% else %}" - "WHERE `{{ timestamp_column }}` BETWEEN to_timestamp(\"{{ start_date }}\") AND to_timestamp(\"{{ end_date }}\") AND `{{ tagname_column }}` IN ('{{ tag_names | join('\\', \\'') }}') " - "{% endif %}" - "{% if include_status is defined and include_status == true and include_bad_data is defined and include_bad_data == false %}" - "AND `{{ status_column }}` <> 'Bad'" - "{% endif %}" - "ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " - ") " - "{% if display_uom is defined and display_uom == true %}" - 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(e.`EventTime`, e.`TagName`, e.`Status`, e.`Value`, m.`UOM`), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}e.`EventTime`, e.`TagName`, e.`Status`, e.`Value`, m.`UOM`{% endif %} FROM raw_events e ' - "LEFT OUTER JOIN " - "{% if metadata_source is defined and metadata_source is not none %}" - "`{{ metadata_source|lower }}` m ON e.`{{ tagname_column }}` = m.`{{ metadata_tagname_column }}` " - "{% else %}" - "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON e.`{{ tagname_column }}` = m.`{{ tagname_column }}` " - "{% endif %}" - "{% else %}" - 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(*), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}*{% endif %} FROM raw_events ' - "{% endif %}" - "{% if limit is defined and limit is not none %}" - "LIMIT {{ limit }} " - "{% endif %}" - "{% if offset is defined and offset is not none %}" - "OFFSET {{ offset }} " - "{% endif %}" +def _build_sql_cte_statement(sql_query_list): + sql_cte_query = ", ".join( + [sql_query["sql_query"] for sql_query in sql_query_list[:-1]], + ) + + sql_cte_query = " ".join(["WITH", sql_cte_query]) + + if len(sql_cte_query) > 1: + sql_cte_query = " ".join([sql_cte_query, sql_query_list[-1]["sql_query"]]) + + return sql_cte_query + + +def _window_start_time_offset(start_date, time_interval_rate, time_interval_unit: str): + time_interval_rate_number = float(time_interval_rate) + + if "day" in time_interval_unit: + time_interval_rate_seconds = time_interval_rate_number * 24 * 3600 + elif "hour" in time_interval_unit: + time_interval_rate_seconds = time_interval_rate_number * 3600 + elif "minute" in time_interval_unit: + time_interval_rate_seconds = time_interval_rate_number * 60 + elif "second" in time_interval_unit: + time_interval_rate_seconds = time_interval_rate_number + + # Calculate Offset for startTime parameter + + offset_start_time = ( + datetime.strptime(start_date, TIMESTAMP_FORMAT).timestamp() + % time_interval_rate_seconds + ) + + offset_start_time = f"{int(offset_start_time)} second" + return offset_start_time + + +def _build_raw_query( + sql_query_name, + timestamp_column, + tagname_column, + status_column, + value_column, + start_date, + end_date, + time_zone, + time_interval_rate=None, + time_interval_unit=None, + agg_method=None, + deduplicate=None, + source=None, + business_unit=None, + asset=None, + data_security_level=None, + data_type=None, + tag_names=None, + include_status=None, + include_bad_data=None, + case_insensitivity_tag_search=None, + sort=True, +): + # Select + raw_query_sql = f"{sql_query_name} AS (SELECT" + if agg_method == "avg" or deduplicate == True: + raw_query_sql = " ".join([raw_query_sql, "DISTINCT"]) + + # Event Time + raw_query_sql = " ".join( + [ + raw_query_sql, + f"from_utc_timestamp(date_trunc('millisecond',`{timestamp_column}`), '{time_zone}') AS `{timestamp_column}`,", + ] + ) + if time_interval_rate is not None: + window_offset_start_time = _window_start_time_offset( + start_date=start_date, + time_interval_rate=time_interval_rate, + time_interval_unit=time_interval_unit, + ) + raw_query_sql = " ".join( + [ + raw_query_sql, + f"window(from_utc_timestamp(date_trunc('millisecond',`{timestamp_column}`), '{time_zone}'), '{time_interval_rate} {time_interval_unit}', '{time_interval_rate} {time_interval_unit}', '{window_offset_start_time}') AS `window`,", + ] + ) + + # Tag Name + raw_query_sql = " ".join([raw_query_sql, f"`{tagname_column}`,"]) + + # Status + if include_status == True: + raw_query_sql = " ".join([raw_query_sql, f"`{status_column}`,"]) + else: + raw_query_sql = " ".join([raw_query_sql, "'Good' AS `Status`,"]) + + # Value + raw_query_sql = " ".join([raw_query_sql, f"`{value_column}` FROM"]) + + if source is not None: + raw_query_sql = " ".join([raw_query_sql, f"`{source.lower()}`"]) + else: + raw_query_sql = " ".join( + [ + raw_query_sql, + f"`{business_unit.lower()}`.`sensors`.`{asset.lower()}_{data_security_level.lower()}_events_{data_type.lower()}`", + ] + ) + + # Where + to_timestamp = ( + f"to_timestamp('{end_date}')" + if time_interval_rate is None + else f"timestampadd({time_interval_unit}, {time_interval_rate}, to_timestamp('{end_date}'))" + ) + + raw_query_sql = " ".join( + [ + raw_query_sql, + f"WHERE `{timestamp_column}` BETWEEN to_timestamp('{start_date}') AND {to_timestamp} AND", + ] + ) + + if case_insensitivity_tag_search == True: + quoted_tag_names = "', '".join([tag.upper() for tag in tag_names]) + raw_query_sql = " ".join( + [ + raw_query_sql, + f"UPPER(`{tagname_column}`) IN ('{quoted_tag_names}')", + ] + ) + else: + quoted_tag_names = "', '".join(tag_names) + raw_query_sql = " ".join( + [ + raw_query_sql, + f"`{tagname_column}` IN ('{quoted_tag_names}')", + ] + ) + + if include_status == True and include_bad_data == False: + raw_query_sql = " ".join([raw_query_sql, f"AND `{status_column}` <> 'Bad'"]) + + if sort == True: + raw_query_sql = " ".join( + [ + raw_query_sql, + f"ORDER BY `{tagname_column}`, `{timestamp_column}`", + ] + ) + raw_query_sql += ")" + + return raw_query_sql + + +def _build_resample_query( + sql_query_list, + sql_query_name, + timestamp_column, + tagname_column, + value_column, + tag_names, + start_date, + end_date, + time_zone, + time_interval_rate, + time_interval_unit, + agg_method, + case_insensitivity_tag_search, + fill=False, + sort=True, +): + parent_sql_query_name = sql_query_list[-1]["query_name"] + + from_sql = parent_sql_query_name + timestamp_sql = f"{parent_sql_query_name}.`window`.start" + tagname_sql = f"{parent_sql_query_name}.`{tagname_column}`" + groupby_sql = f"{parent_sql_query_name}.`{tagname_column}`, {parent_sql_query_name}.`window`.start" + + if fill == True: + quoted_tag_names = ( + "', '".join([tag.upper() for tag in tag_names]) + if case_insensitivity_tag_search == True + else "', '".join(tag_names) + ) + date_fill_query = f"fill_intervals AS (SELECT DISTINCT explode(sequence(from_utc_timestamp(to_timestamp('{start_date}'), '{time_zone}'), from_utc_timestamp(to_timestamp('{end_date}'), '{time_zone}'), INTERVAL '{time_interval_rate} {time_interval_unit}')) AS `{timestamp_column}`, explode(array('{quoted_tag_names}')) AS `{tagname_column}`)" + from_sql = f"fill_intervals LEFT OUTER JOIN {parent_sql_query_name} ON fill_intervals.`{timestamp_column}` = {parent_sql_query_name}.`window`.start AND fill_intervals.`{tagname_column}` = {parent_sql_query_name}.`{tagname_column}`" + timestamp_sql = f"fill_intervals.`{timestamp_column}`" + tagname_sql = f"fill_intervals.`{tagname_column}`" + groupby_sql = ( + f"fill_intervals.`{tagname_column}`, fill_intervals.`{timestamp_column}`" + ) + + resample_query_sql = f"{sql_query_name} AS (SELECT {tagname_sql}, {timestamp_sql} AS `{timestamp_column}`, {agg_method}({parent_sql_query_name}.`{value_column}`) AS `{value_column}` FROM {from_sql} GROUP BY {groupby_sql}" + + if fill == True: + resample_query_sql = ", ".join( + [ + date_fill_query, + resample_query_sql, + ] + ) + + if sort == True: + resample_query_sql = " ".join( + [ + resample_query_sql, + f"ORDER BY `{tagname_column}`, `{timestamp_column}`", + ] + ) + + return resample_query_sql + ")" + + +def _build_interpolate_query( + sql_query_list, + sql_query_name, + tagname_column, + timestamp_column, + value_column, + interpolation_method, + sort=True, +): + parent_sql_query_name = sql_query_list[-1]["query_name"] + + interpolate_query_sql = ( + f"{sql_query_name} AS (SELECT a.`{timestamp_column}`, a.`{tagname_column}`," + ) + + if interpolation_method == "forward_fill": + interpolate_query_sql = f"{sql_query_name} AS (SELECT `{timestamp_column}`, `{tagname_column}`, last_value(`{value_column}`, true) OVER (PARTITION BY `{tagname_column}` ORDER BY `{timestamp_column}` ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `{value_column}` FROM {parent_sql_query_name}" + elif interpolation_method == "backward_fill": + interpolate_query_sql = f"{sql_query_name} AS (SELECT `{timestamp_column}`, `{tagname_column}`, first_value(`{value_column}`, true) OVER (PARTITION BY `{tagname_column}` ORDER BY `{timestamp_column}` ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS `{value_column}` FROM {parent_sql_query_name}" + elif interpolation_method == "linear": + linear_interpolate_query_sql = " ".join( + [ + f"linear_interpolation_calculations AS (SELECT coalesce(`{tagname_column}`, `{tagname_column}`) AS `{tagname_column}`, coalesce(`{timestamp_column}`, `{timestamp_column}`) AS `{timestamp_column}`, `{timestamp_column}` AS `Requested_{timestamp_column}`, `{timestamp_column}` AS `Found_{timestamp_column}`, `{value_column}`,", + f"last_value(`{timestamp_column}`, true) OVER (PARTITION BY `{tagname_column}` ORDER BY `{timestamp_column}` ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `Last_{timestamp_column}`, last_value(`{value_column}`, true) OVER (PARTITION BY `{tagname_column}` ORDER BY `{timestamp_column}` ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `Last_{value_column}`,", + f"first_value(`{timestamp_column}`, true) OVER (PARTITION BY `{tagname_column}` ORDER BY `{timestamp_column}` ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS `Next_{timestamp_column}`, first_value(`{value_column}`, true) OVER (PARTITION BY `{tagname_column}` ORDER BY `{timestamp_column}` ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS `Next_{value_column}`,", + f"CASE WHEN `{value_column}` is NULL THEN `Last_{value_column}` + (unix_timestamp(`{timestamp_column}`) - unix_timestamp(`Last_{timestamp_column}`)) * ((`Next_{value_column}` - `Last_{value_column}`)) / ((unix_timestamp(`Next_{timestamp_column}`) - unix_timestamp(`Last_{timestamp_column}`))) ELSE `{value_column}` END AS `linear_interpolated_{value_column}` from {parent_sql_query_name}", + ] + ) + + interpolate_query_sql = ( + linear_interpolate_query_sql + + ", " + + interpolate_query_sql + + f" `linear_interpolated_{value_column}` AS `{value_column}` FROM linear_interpolation_calculations)" + ) + else: + interpolate_query_sql = ( + f"{sql_query_name} AS (SELECT * FROM {parent_sql_query_name}" + ) + + if sort == True: + interpolate_query_sql = " ".join( + [ + interpolate_query_sql, + f"ORDER BY `{tagname_column}`, `{timestamp_column}`", + ] + ) + + return interpolate_query_sql + ")" + + +def _build_pivot_query( + sql_query_list, + sql_query_name, + tagname_column, + timestamp_column, + value_column, + tag_names, + is_case_insensitive_tag_search, + sort=True, +): + parent_sql_query_name = sql_query_list[-1]["query_name"] + + tag_names_string = ( + ", ".join([f"'{tag.upper()}' AS `{tag}`" for tag in tag_names]) + if is_case_insensitive_tag_search == True + else ", ".join([f"'{tag}' AS `{tag}`" for tag in tag_names]) ) + pivot_query_sql = f"{sql_query_name} AS (SELECT * FROM (SELECT `{timestamp_column}`, `{value_column}`," + + if is_case_insensitive_tag_search == True: + pivot_query_sql = " ".join( + [pivot_query_sql, f"UPPER(`{tagname_column}`) AS `{tagname_column}`"] + ) + else: + pivot_query_sql = " ".join([pivot_query_sql, f"`{tagname_column}`"]) + + pivot_query_sql = " ".join( + [ + pivot_query_sql, + f"FROM {parent_sql_query_name}) PIVOT (FIRST(`{value_column}`) FOR `{tagname_column}` IN ({tag_names_string}))", + ] + ) + + if sort == True: + pivot_query_sql = " ".join( + [ + pivot_query_sql, + f"ORDER BY `{timestamp_column}`", + ] + ) + + return pivot_query_sql + ")" + + +def _build_uom_query( + sql_query_list, + sql_query_name, + metadata_source, + business_unit, + asset, + data_security_level, + tagname_column, + metadata_tagname_column, + metadata_uom_column, +): + parent_sql_query_name = sql_query_list[-1]["query_name"] + + uom_sql_query = f"{sql_query_name} AS (SELECT {parent_sql_query_name}.*, metadata.`{metadata_uom_column}` FROM {parent_sql_query_name} LEFT OUTER JOIN" + + if metadata_source: + uom_sql_query = " ".join([uom_sql_query, f"{metadata_source}"]) + else: + uom_sql_query = " ".join( + [ + uom_sql_query, + f"`{business_unit.lower()}`.`sensors`.`{asset.lower()}_{data_security_level.lower()}_metadata`", + ] + ) + + uom_sql_query = " ".join( + [ + uom_sql_query, + f"AS metadata ON {parent_sql_query_name}.`{tagname_column}` = metadata.`{metadata_tagname_column}`", + ] + ) + + return uom_sql_query + ")" + + +def _build_output_query(sql_query_list, to_json, limit, offset): + parent_sql_query_name = sql_query_list[-1]["query_name"] + + output_sql_query = f"SELECT" + + if to_json == True: + output_sql_query = " ".join( + [ + output_sql_query, + "to_json(struct(*), map('timestampFormat', 'yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSSXXX')) AS Value", + ] + ) + else: + output_sql_query = " ".join([output_sql_query, "*"]) + + output_sql_query = " ".join([output_sql_query, f"FROM {parent_sql_query_name}"]) + + if limit is not None: + output_sql_query = " ".join([output_sql_query, f"LIMIT {limit}"]) + + if offset is not None: + output_sql_query = " ".join([output_sql_query, f"OFFSET {offset}"]) + + return output_sql_query + + +def _raw_query(parameters_dict: dict) -> str: + + sql_query_list = [] + raw_parameters = { "source": parameters_dict.get("source", None), "metadata_source": parameters_dict.get("metadata_source", None), @@ -76,6 +412,7 @@ def _raw_query(parameters_dict: dict) -> str: "end_date": parameters_dict["end_date"], "tag_names": list(dict.fromkeys(parameters_dict["tag_names"])), "include_bad_data": parameters_dict["include_bad_data"], + "sort": parameters_dict.get("sort", True), "limit": parameters_dict.get("limit", None), "offset": parameters_dict.get("offset", None), "display_uom": parameters_dict.get("display_uom", False), @@ -105,8 +442,136 @@ def _raw_query(parameters_dict: dict) -> str: "to_json": parameters_dict.get("to_json", False), } - sql_template = Template(raw_query) - return sql_template.render(raw_parameters) + raw_query = _build_raw_query( + sql_query_name="raw", + timestamp_column=raw_parameters["timestamp_column"], + tagname_column=raw_parameters["tagname_column"], + status_column=raw_parameters["status_column"], + value_column=raw_parameters["value_column"], + start_date=raw_parameters["start_date"], + end_date=raw_parameters["end_date"], + time_zone=raw_parameters["time_zone"], + deduplicate=True, + source=raw_parameters["source"], + business_unit=raw_parameters["business_unit"], + asset=raw_parameters["asset"], + data_security_level=raw_parameters["data_security_level"], + data_type=raw_parameters["data_type"], + tag_names=raw_parameters["tag_names"], + include_status=raw_parameters["include_status"], + case_insensitivity_tag_search=raw_parameters["case_insensitivity_tag_search"], + sort=raw_parameters["sort"], + ) + + sql_query_list.append({"query_name": "raw", "sql_query": raw_query}) + + if raw_parameters["display_uom"] == True: + uom_query = _build_uom_query( + sql_query_list=sql_query_list, + sql_query_name="uom", + metadata_source=raw_parameters["metadata_source"], + business_unit=raw_parameters["business_unit"], + asset=raw_parameters["asset"], + data_security_level=raw_parameters["data_security_level"], + tagname_column=raw_parameters["tagname_column"], + metadata_tagname_column=raw_parameters["metadata_tagname_column"], + metadata_uom_column=raw_parameters["metadata_uom_column"], + ) + + sql_query_list.append({"query_name": "uom", "sql_query": uom_query}) + + output_query = _build_output_query( + sql_query_list=sql_query_list, + to_json=raw_parameters["to_json"], + limit=raw_parameters["limit"], + offset=raw_parameters["offset"], + ) + + sql_query_list.append({"query_name": "output", "sql_query": output_query}) + + sql_query = _build_sql_cte_statement(sql_query_list) + + return sql_query + + # raw_query = ( + # 'WITH raw_events AS (SELECT DISTINCT from_utc_timestamp(date_trunc("millisecond",`{{ timestamp_column }}`), "{{ time_zone }}") AS `{{ timestamp_column }}`, `{{ tagname_column }}`, {% if include_status is defined and include_status == true %} `{{ status_column }}`, {% endif %} `{{ value_column }}` FROM ' + # "{% if source is defined and source is not none %}" + # "`{{ source|lower }}` " + # "{% else %}" + # "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_events_{{ data_type|lower }}` " + # "{% endif %}" + # "{% if case_insensitivity_tag_search is defined and case_insensitivity_tag_search == true %}" + # "WHERE `{{ timestamp_column }}` BETWEEN to_timestamp(\"{{ start_date }}\") AND to_timestamp(\"{{ end_date }}\") AND UPPER(`{{ tagname_column }}`) IN ('{{ tag_names | join('\\', \\'') | upper }}') " + # "{% else %}" + # "WHERE `{{ timestamp_column }}` BETWEEN to_timestamp(\"{{ start_date }}\") AND to_timestamp(\"{{ end_date }}\") AND `{{ tagname_column }}` IN ('{{ tag_names | join('\\', \\'') }}') " + # "{% endif %}" + # "{% if include_status is defined and include_status == true and include_bad_data is defined and include_bad_data == false %}" + # "AND `{{ status_column }}` <> 'Bad'" + # "{% endif %}" + # "ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " + # ") " + # "{% if display_uom is defined and display_uom == true %}" + # 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(e.`EventTime`, e.`TagName`, e.`Status`, e.`Value`, m.`UOM`), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}e.`EventTime`, e.`TagName`, e.`Status`, e.`Value`, m.`UOM`{% endif %} FROM raw_events e ' + # "LEFT OUTER JOIN " + # "{% if metadata_source is defined and metadata_source is not none %}" + # "`{{ metadata_source|lower }}` m ON e.`{{ tagname_column }}` = m.`{{ metadata_tagname_column }}` " + # "{% else %}" + # "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON e.`{{ tagname_column }}` = m.`{{ tagname_column }}` " + # "{% endif %}" + # "{% else %}" + # 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(*), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}*{% endif %} FROM raw_events ' + # "{% endif %}" + # "{% if limit is defined and limit is not none %}" + # "LIMIT {{ limit }} " + # "{% endif %}" + # "{% if offset is defined and offset is not none %}" + # "OFFSET {{ offset }} " + # "{% endif %}" + # ) + + # raw_parameters = { + # "source": parameters_dict.get("source", None), + # "metadata_source": parameters_dict.get("metadata_source", None), + # "business_unit": parameters_dict.get("business_unit"), + # "region": parameters_dict.get("region"), + # "asset": parameters_dict.get("asset"), + # "data_security_level": parameters_dict.get("data_security_level"), + # "data_type": parameters_dict.get("data_type"), + # "start_date": parameters_dict["start_date"], + # "end_date": parameters_dict["end_date"], + # "tag_names": list(dict.fromkeys(parameters_dict["tag_names"])), + # "include_bad_data": parameters_dict["include_bad_data"], + # "limit": parameters_dict.get("limit", None), + # "offset": parameters_dict.get("offset", None), + # "display_uom": parameters_dict.get("display_uom", False), + # "time_zone": parameters_dict["time_zone"], + # "tagname_column": parameters_dict.get("tagname_column", "TagName"), + # "timestamp_column": parameters_dict.get("timestamp_column", "EventTime"), + # "include_status": ( + # False + # if "status_column" in parameters_dict + # and parameters_dict.get("status_column") is None + # else True + # ), + # "status_column": ( + # "Status" + # if "status_column" in parameters_dict + # and parameters_dict.get("status_column") is None + # else parameters_dict.get("status_column", "Status") + # ), + # "value_column": parameters_dict.get("value_column", "Value"), + # "case_insensitivity_tag_search": parameters_dict.get( + # "case_insensitivity_tag_search", False + # ), + # "metadata_tagname_column": parameters_dict.get( + # "metadata_tagname_column", "TagName" + # ), + # "metadata_uom_column": parameters_dict.get("metadata_uom_column", "UoM"), + # "to_json": parameters_dict.get("to_json", False), + # } + + # sql_template = Template(raw_query) + # return sql_template.render(raw_parameters) def _sql_query(parameters_dict: dict) -> str: @@ -137,62 +602,7 @@ def _sql_query(parameters_dict: dict) -> str: return sql_template.render(sql_parameters) -def _sample_query(parameters_dict: dict) -> tuple: - sample_query = ( - 'WITH raw_events AS (SELECT DISTINCT from_utc_timestamp(date_trunc("millisecond",`{{ timestamp_column }}`), "{{ time_zone }}") AS `{{ timestamp_column }}`, `{{ tagname_column }}`, {% if include_status is defined and include_status == true %} `{{ status_column }}`, {% else %} \'Good\' AS `Status`, {% endif %} `{{ value_column }}` FROM ' - "{% if source is defined and source is not none %}" - "`{{ source|lower }}` " - "{% else %}" - "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_events_{{ data_type|lower }}` " - "{% endif %}" - "{% if case_insensitivity_tag_search is defined and case_insensitivity_tag_search == true %}" - "WHERE `{{ timestamp_column }}` BETWEEN to_timestamp(\"{{ start_date }}\") AND to_timestamp(\"{{ end_date }}\") AND UPPER(`{{ tagname_column }}`) IN ('{{ tag_names | join('\\', \\'') | upper }}') " - "{% else %}" - "WHERE `{{ timestamp_column }}` BETWEEN to_timestamp(\"{{ start_date }}\") AND to_timestamp(\"{{ end_date }}\") AND `{{ tagname_column }}` IN ('{{ tag_names | join('\\', \\'') }}') " - "{% endif %}" - "{% if include_status is defined and include_status == true and include_bad_data is defined and include_bad_data == false %} AND `{{ status_column }}` <> 'Bad' {% endif %}) " - ',date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("{{ start_date }}"), "{{ time_zone }}"), from_utc_timestamp(to_timestamp("{{ end_date }}"), "{{ time_zone }}"), INTERVAL \'{{ time_interval_rate + \' \' + time_interval_unit }}\')) AS timestamp_array) ' - ",window_buckets AS (SELECT timestamp_array AS window_start, timestampadd({{time_interval_unit }}, {{ time_interval_rate }}, timestamp_array) AS window_end FROM date_array) " - ",resample AS (SELECT /*+ RANGE_JOIN(d, {{ range_join_seconds }} ) */ d.window_start, d.window_end, e.`{{ tagname_column }}`, {{ agg_method }}(e.`{{ value_column }}`) OVER (PARTITION BY e.`{{ tagname_column }}`, d.window_start ORDER BY e.`{{ timestamp_column }}` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `{{ value_column }}` FROM window_buckets d INNER JOIN raw_events e ON d.window_start <= e.`{{ timestamp_column }}` AND d.window_end > e.`{{ timestamp_column }}`) " - ",project AS (SELECT window_start AS `{{ timestamp_column }}`, `{{ tagname_column }}`, `{{ value_column }}` FROM resample GROUP BY window_start, `{{ tagname_column }}`, `{{ value_column }}` " - "{% if is_resample is defined and is_resample == true %}" - "ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " - "{% endif %}" - ") " - "{% if is_resample is defined and is_resample == true and pivot is defined and pivot == true %}" - "{% if case_insensitivity_tag_search is defined and case_insensitivity_tag_search == true %}" - ",pivot AS (SELECT * FROM (SELECT `{{ timestamp_column }}`, `{{ value_column }}`, UPPER(`{{ tagname_column }}`) AS `{{ tagname_column }}` FROM project) PIVOT (FIRST(`{{ value_column }}`) FOR `{{ tagname_column }}` IN (" - "{% for i in range(tag_names | length) %}" - "'{{ tag_names[i] | upper }}' AS `{{ tag_names[i] }}`{% if not loop.last %}, {% endif %}" - "{% endfor %}" - "{% else %}" - ",pivot AS (SELECT * FROM (SELECT `{{ timestamp_column }}`, `{{ value_column }}`, `{{ tagname_column }}` AS `{{ tagname_column }}` FROM project) PIVOT (FIRST(`{{ value_column }}`) FOR `{{ tagname_column }}` IN (" - "{% for i in range(tag_names | length) %}" - "'{{ tag_names[i] }}' AS `{{ tag_names[i] }}`{% if not loop.last %}, {% endif %}" - "{% endfor %}" - "{% endif %}" - '))) SELECT {% if to_json_resample is defined and to_json_resample == true %}to_json(struct(*), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}*{% endif %} FROM pivot ORDER BY `{{ timestamp_column }}` ' - "{% else %}" - "{% if display_uom is defined and display_uom == true %}" - 'SELECT {% if to_json_resample is defined and to_json_resample == true %}to_json(struct(p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM`), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM`{% endif %} FROM project p ' - "LEFT OUTER JOIN " - "{% if metadata_source is defined and metadata_source is not none %}" - "`{{ metadata_source|lower }}` m ON p.`{{ tagname_column }}` = m.`{{ metadata_tagname_column }}` " - "{% else %}" - "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON p.`{{ tagname_column }}` = m.`{{ tagname_column }}` " - "{% endif %}" - "{% else %}" - 'SELECT {% if to_json_resample is defined and to_json_resample == true %}to_json(struct(*), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}*{% endif %} FROM project ' - "{% endif %}" - "{% endif %}" - "{% if is_resample is defined and is_resample == true and limit is defined and limit is not none %}" - "LIMIT {{ limit }} " - "{% endif %}" - "{% if is_resample is defined and is_resample == true and offset is defined and offset is not none %}" - "OFFSET {{ offset }} " - "{% endif %}" - ) - +def _sample_query_parameters(parameters_dict: dict) -> dict: sample_parameters = { "source": parameters_dict.get("source", None), "metadata_source": parameters_dict.get("metadata_source", None), @@ -208,6 +618,7 @@ def _sample_query(parameters_dict: dict) -> tuple: "time_interval_rate": parameters_dict["time_interval_rate"], "time_interval_unit": parameters_dict["time_interval_unit"], "agg_method": parameters_dict["agg_method"], + "fill": parameters_dict.get("fill", False), "time_zone": parameters_dict["time_zone"], "pivot": parameters_dict.get("pivot", None), "limit": parameters_dict.get("limit", None), @@ -233,16 +644,368 @@ def _sample_query(parameters_dict: dict) -> tuple: "case_insensitivity_tag_search", False ), "display_uom": parameters_dict.get("display_uom", False), + "sort": parameters_dict.get("sort", True), "metadata_tagname_column": parameters_dict.get( "metadata_tagname_column", "TagName" ), "metadata_uom_column": parameters_dict.get("metadata_uom_column", "UoM"), "to_json_resample": parameters_dict.get("to_json", False), } + return sample_parameters + + +def _sample_query(parameters_dict: dict) -> str: + + sample_parameters = _sample_query_parameters(parameters_dict) + + sql_query_list = [] + + raw_query = _build_raw_query( + sql_query_name="raw", + timestamp_column=sample_parameters["timestamp_column"], + tagname_column=sample_parameters["tagname_column"], + status_column=sample_parameters["status_column"], + value_column=sample_parameters["value_column"], + start_date=sample_parameters["start_date"], + end_date=sample_parameters["end_date"], + time_interval_rate=sample_parameters["time_interval_rate"], + time_interval_unit=sample_parameters["time_interval_unit"], + agg_method=sample_parameters["agg_method"], + time_zone=sample_parameters["time_zone"], + source=sample_parameters["source"], + business_unit=sample_parameters["business_unit"], + asset=sample_parameters["asset"], + data_security_level=sample_parameters["data_security_level"], + data_type=sample_parameters["data_type"], + tag_names=sample_parameters["tag_names"], + include_status=sample_parameters["include_status"], + case_insensitivity_tag_search=sample_parameters[ + "case_insensitivity_tag_search" + ], + sort=False, + ) + + sql_query_list.append({"query_name": "raw", "sql_query": raw_query}) + + resample_query = _build_resample_query( + sql_query_list=sql_query_list, + sql_query_name="resample", + timestamp_column=sample_parameters["timestamp_column"], + tagname_column=sample_parameters["tagname_column"], + value_column=sample_parameters["value_column"], + tag_names=sample_parameters["tag_names"], + start_date=sample_parameters["start_date"], + end_date=sample_parameters["end_date"], + time_zone=sample_parameters["time_zone"], + time_interval_rate=sample_parameters["time_interval_rate"], + time_interval_unit=sample_parameters["time_interval_unit"], + agg_method=sample_parameters["agg_method"], + case_insensitivity_tag_search=sample_parameters[ + "case_insensitivity_tag_search" + ], + fill=sample_parameters["fill"], + sort=( + sample_parameters["sort"] if sample_parameters["pivot"] == False else False + ), + ) + + sql_query_list.append({"query_name": "resample", "sql_query": resample_query}) + + if sample_parameters["pivot"] == True: + pivot_query = _build_pivot_query( + sql_query_list=sql_query_list, + sql_query_name="pivot", + tagname_column=sample_parameters["tagname_column"], + timestamp_column=sample_parameters["timestamp_column"], + value_column=sample_parameters["value_column"], + tag_names=sample_parameters["tag_names"], + is_case_insensitive_tag_search=sample_parameters[ + "case_insensitivity_tag_search" + ], + sort=sample_parameters["sort"], + ) + + sql_query_list.append({"query_name": "pivot", "sql_query": pivot_query}) + + if sample_parameters["display_uom"] == True: + uom_query = _build_uom_query( + sql_query_list=sql_query_list, + sql_query_name="uom", + metadata_source=sample_parameters["metadata_source"], + business_unit=sample_parameters["business_unit"], + asset=sample_parameters["asset"], + data_security_level=sample_parameters["data_security_level"], + tagname_column=sample_parameters["tagname_column"], + metadata_tagname_column=sample_parameters["metadata_tagname_column"], + metadata_uom_column=sample_parameters["metadata_uom_column"], + ) + + sql_query_list.append({"query_name": "uom", "sql_query": uom_query}) + + output_query = _build_output_query( + sql_query_list=sql_query_list, + to_json=sample_parameters["to_json_resample"], + limit=sample_parameters["limit"], + offset=sample_parameters["offset"], + ) + + sql_query_list.append({"query_name": "output", "sql_query": output_query}) + + sql_query = _build_sql_cte_statement(sql_query_list) + + return sql_query + + # sample_query = ( + # "WITH raw_events AS (SELECT DISTINCT from_utc_timestamp(date_trunc(\"millisecond\",`{{ timestamp_column }}`), \"{{ time_zone }}\") AS `{{ timestamp_column }}`, `{{ tagname_column }}`, {% if include_status is defined and include_status == true %} `{{ status_column }}`, {% else %} 'Good' AS `Status`, {% endif %} `{{ value_column }}`, window(`{{ timestamp_column }}`, '{{ time_interval_rate + ' ' + time_interval_unit }}') FROM " + # "{% if source is defined and source is not none %}" + # "`{{ source|lower }}` " + # "{% else %}" + # "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_events_{{ data_type|lower }}` " + # "{% endif %}" + # "{% if case_insensitivity_tag_search is defined and case_insensitivity_tag_search == true %}" + # "WHERE `{{ timestamp_column }}` BETWEEN to_timestamp(\"{{ start_date }}\") AND timestampadd({{ time_interval_unit }}, {{ time_interval_rate }}, to_timestamp(\"{{ end_date }}\")) AND UPPER(`{{ tagname_column }}`) IN ('{{ tag_names | join('\\', \\'') | upper }}') " + # "{% else %}" + # "WHERE `{{ timestamp_column }}` BETWEEN to_timestamp(\"{{ start_date }}\") AND timestampadd({{ time_interval_unit }}, {{ time_interval_rate }}, to_timestamp(\"{{ end_date }}\")) AND `{{ tagname_column }}` IN ('{{ tag_names | join('\\', \\'') }}') " + # "{% endif %}" + # "{% if include_status is defined and include_status == true and include_bad_data is defined and include_bad_data == false %} AND `{{ status_column }}` <> 'Bad' {% endif %}) " + # ',date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("{{ start_date }}"), "{{ time_zone }}"), from_utc_timestamp(to_timestamp("{{ end_date }}"), "{{ time_zone }}"), INTERVAL \'{{ time_interval_rate + \' \' + time_interval_unit }}\')) AS timestamp_array) ' + # ",window_buckets AS (SELECT timestamp_array AS window_start, window(timestamp_array, '{{ time_interval_rate + ' ' + time_interval_unit }}') FROM date_array) " + # ",project AS (SELECT d.window_start AS `{{ timestamp_column }}`, e.`{{ tagname_column }}`, {{ agg_method }}(e.`{{ value_column }}`) AS `{{ value_column }}` FROM window_buckets d INNER JOIN raw_events e ON d.window.start = e.window.start AND d.window.end = e.window.end GROUP BY d.window_start, e.`{{ tagname_column }}` " + # "{% if is_resample is defined and is_resample == true %}" + # "ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " + # "{% endif %}" + # ") " + # "{% if is_resample is defined and is_resample == true and pivot is defined and pivot == true %}" + # "{% if case_insensitivity_tag_search is defined and case_insensitivity_tag_search == true %}" + # ",pivot AS (SELECT * FROM (SELECT `{{ timestamp_column }}`, `{{ value_column }}`, UPPER(`{{ tagname_column }}`) AS `{{ tagname_column }}` FROM project) PIVOT (FIRST(`{{ value_column }}`) FOR `{{ tagname_column }}` IN (" + # "{% for i in range(tag_names | length) %}" + # "'{{ tag_names[i] | upper }}' AS `{{ tag_names[i] }}`{% if not loop.last %}, {% endif %}" + # "{% endfor %}" + # "{% else %}" + # ",pivot AS (SELECT * FROM (SELECT `{{ timestamp_column }}`, `{{ value_column }}`, `{{ tagname_column }}` AS `{{ tagname_column }}` FROM project) PIVOT (FIRST(`{{ value_column }}`) FOR `{{ tagname_column }}` IN (" + # "{% for i in range(tag_names | length) %}" + # "'{{ tag_names[i] }}' AS `{{ tag_names[i] }}`{% if not loop.last %}, {% endif %}" + # "{% endfor %}" + # "{% endif %}" + # '))) SELECT {% if to_json_resample is defined and to_json_resample == true %}to_json(struct(*), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}*{% endif %} FROM pivot ORDER BY `{{ timestamp_column }}` ' + # "{% else %}" + # "{% if display_uom is defined and display_uom == true %}" + # 'SELECT {% if to_json_resample is defined and to_json_resample == true %}to_json(struct(p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM`), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM`{% endif %} FROM project p ' + # "LEFT OUTER JOIN " + # "{% if metadata_source is defined and metadata_source is not none %}" + # "`{{ metadata_source|lower }}` m ON p.`{{ tagname_column }}` = m.`{{ metadata_tagname_column }}` " + # "{% else %}" + # "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON p.`{{ tagname_column }}` = m.`{{ tagname_column }}` " + # "{% endif %}" + # "{% else %}" + # 'SELECT {% if to_json_resample is defined and to_json_resample == true %}to_json(struct(*), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}*{% endif %} FROM project ' + # "{% endif %}" + # "{% endif %}" + # "{% if is_resample is defined and is_resample == true and limit is defined and limit is not none %}" + # "LIMIT {{ limit }} " + # "{% endif %}" + # "{% if is_resample is defined and is_resample == true and offset is defined and offset is not none %}" + # "OFFSET {{ offset }} " + # "{% endif %}" + # ) + + # sql_template = Template(sample_query) + # sql_query = sql_template.render(sample_parameters) + # return sql_query, sample_query, sample_parameters + + +def _interpolation_query(parameters_dict: dict) -> str: + + interpolate_parameters = _sample_query_parameters(parameters_dict) + + interpolate_parameters["interpolation_method"] = parameters_dict[ + "interpolation_method" + ] + + sql_query_list = [] + + raw_query = _build_raw_query( + sql_query_name="raw", + timestamp_column=interpolate_parameters["timestamp_column"], + tagname_column=interpolate_parameters["tagname_column"], + status_column=interpolate_parameters["status_column"], + value_column=interpolate_parameters["value_column"], + start_date=interpolate_parameters["start_date"], + end_date=interpolate_parameters["end_date"], + time_interval_rate=interpolate_parameters["time_interval_rate"], + time_interval_unit=interpolate_parameters["time_interval_unit"], + agg_method=interpolate_parameters["agg_method"], + time_zone=interpolate_parameters["time_zone"], + source=interpolate_parameters["source"], + business_unit=interpolate_parameters["business_unit"], + asset=interpolate_parameters["asset"], + data_security_level=interpolate_parameters["data_security_level"], + data_type=interpolate_parameters["data_type"], + tag_names=interpolate_parameters["tag_names"], + include_status=interpolate_parameters["include_status"], + case_insensitivity_tag_search=interpolate_parameters[ + "case_insensitivity_tag_search" + ], + sort=False, + ) + + sql_query_list.append({"query_name": "raw", "sql_query": raw_query}) + + resample_query = _build_resample_query( + sql_query_list=sql_query_list, + sql_query_name="resample", + timestamp_column=interpolate_parameters["timestamp_column"], + tagname_column=interpolate_parameters["tagname_column"], + value_column=interpolate_parameters["value_column"], + tag_names=interpolate_parameters["tag_names"], + start_date=interpolate_parameters["start_date"], + end_date=interpolate_parameters["end_date"], + time_zone=interpolate_parameters["time_zone"], + time_interval_rate=interpolate_parameters["time_interval_rate"], + time_interval_unit=interpolate_parameters["time_interval_unit"], + agg_method=interpolate_parameters["agg_method"], + case_insensitivity_tag_search=interpolate_parameters[ + "case_insensitivity_tag_search" + ], + fill=True, + sort=False, + ) + + sql_query_list.append({"query_name": "resample", "sql_query": resample_query}) + + interpolate_query = _build_interpolate_query( + sql_query_list=sql_query_list, + sql_query_name="interpolate", + timestamp_column=interpolate_parameters["timestamp_column"], + tagname_column=interpolate_parameters["tagname_column"], + value_column=interpolate_parameters["value_column"], + interpolation_method=interpolate_parameters["interpolation_method"], + sort=( + interpolate_parameters["sort"] + if interpolate_parameters["pivot"] == False + else False + ), + ) + + sql_query_list.append({"query_name": "interpolate", "sql_query": interpolate_query}) + + if interpolate_parameters["pivot"] == True: + pivot_query = _build_pivot_query( + sql_query_list=sql_query_list, + sql_query_name="pivot", + tagname_column=interpolate_parameters["tagname_column"], + timestamp_column=interpolate_parameters["timestamp_column"], + value_column=interpolate_parameters["value_column"], + tag_names=interpolate_parameters["tag_names"], + is_case_insensitive_tag_search=interpolate_parameters[ + "case_insensitivity_tag_search" + ], + sort=interpolate_parameters["sort"], + ) + + sql_query_list.append({"query_name": "pivot", "sql_query": pivot_query}) + + if interpolate_parameters["display_uom"] == True: + uom_query = _build_uom_query( + sql_query_list=sql_query_list, + sql_query_name="uom", + metadata_source=interpolate_parameters["metadata_source"], + business_unit=interpolate_parameters["business_unit"], + asset=interpolate_parameters["asset"], + data_security_level=interpolate_parameters["data_security_level"], + tagname_column=interpolate_parameters["tagname_column"], + metadata_tagname_column=interpolate_parameters["metadata_tagname_column"], + metadata_uom_column=interpolate_parameters["metadata_uom_column"], + ) + + sql_query_list.append({"query_name": "uom", "sql_query": uom_query}) - sql_template = Template(sample_query) - sql_query = sql_template.render(sample_parameters) - return sql_query, sample_query, sample_parameters + output_query = _build_output_query( + sql_query_list=sql_query_list, + to_json=interpolate_parameters["to_json_resample"], + limit=interpolate_parameters["limit"], + offset=interpolate_parameters["offset"], + ) + + sql_query_list.append({"query_name": "output", "sql_query": output_query}) + + sql_query = _build_sql_cte_statement(sql_query_list) + + return sql_query + + # if parameters_dict["interpolation_method"] == "forward_fill": + # interpolation_methods = "last_value/UNBOUNDED PRECEDING/CURRENT ROW" + + # if parameters_dict["interpolation_method"] == "backward_fill": + # interpolation_methods = "first_value/CURRENT ROW/UNBOUNDED FOLLOWING" + + # if ( + # parameters_dict["interpolation_method"] == "forward_fill" + # or parameters_dict["interpolation_method"] == "backward_fill" + # ): + # interpolation_options = interpolation_methods.split("/") + + # interpolate_query = ( + # f"WITH resample AS ({sample_query})" + # '{% if (interpolation_method is defined) and (interpolation_method == "forward_fill" or interpolation_method == "backward_fill") %}' + # ",project AS (SELECT a.`{{ timestamp_column }}`, a.`{{ tagname_column }}`, {{ interpolation_options_0 }}(a.`{{ value_column }}`, true) OVER (PARTITION BY a.`{{ tagname_column }}` ORDER BY a.`{{ timestamp_column }}` ROWS BETWEEN {{ interpolation_options_1 }} AND {{ interpolation_options_2 }}) AS `{{ value_column }}` FROM resample a " + # '{% elif (interpolation_method is defined) and (interpolation_method == "linear") %}' + # ",linear_interpolation_calculations AS (SELECT coalesce(a.`{{ tagname_column }}`, a.`{{ tagname_column }}`) AS `{{ tagname_column }}`, coalesce(a.`{{ timestamp_column }}`, a.`{{ timestamp_column }}`) AS `{{ timestamp_column }}`, a.`{{ timestamp_column }}` AS `Requested_{{ timestamp_column }}`, a.`{{ timestamp_column }}` AS `Found_{{ timestamp_column }}`, a.`{{ value_column }}`, " + # "last_value(a.`{{ timestamp_column }}`, true) OVER (PARTITION BY a.`{{ tagname_column }}` ORDER BY a.`{{ timestamp_column }}` ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `Last_{{ timestamp_column }}`, last_value(a.`{{ value_column }}`, true) OVER (PARTITION BY a.`{{ tagname_column }}` ORDER BY a.`{{ timestamp_column }}` ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `Last_{{ value_column }}`, " + # "first_value(a.`{{ timestamp_column }}`, true) OVER (PARTITION BY a.`{{ tagname_column }}` ORDER BY a.`{{ timestamp_column }}` ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS `Next_{{ timestamp_column }}`, first_value(a.`{{ value_column }}`, true) OVER (PARTITION BY a.`{{ tagname_column }}` ORDER BY a.`{{ timestamp_column }}` ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS `Next_{{ value_column }}`, " + # "CASE WHEN a.`{{ value_column }}` is NULL THEN `Last_{{ value_column }}` + (unix_timestamp(a.`{{ timestamp_column }}`) - unix_timestamp(`Last_{{ timestamp_column }}`)) * ((`Next_{{ value_column }}` - `Last_{{ value_column }}`)) / ((unix_timestamp(`Next_{{ timestamp_column }}`) - unix_timestamp(`Last_{{ timestamp_column }}`))) ELSE a.`{{ value_column }}` END AS `linear_interpolated_{{ value_column }}` FROM resample a " + # ",project AS (SELECT `{{ timestamp_column }}`, `{{ tagname_column }}`, `linear_interpolated_{{ value_column }}` AS `{{ value_column }}` FROM linear_interpolation_calculations) " + # "{% else %}" + # ",project AS (SELECT * FROM resample) " + # "{% endif %}" + # "{% if pivot is defined and pivot == true %}" + # "{% if case_insensitivity_tag_search is defined and case_insensitivity_tag_search == true %}" + # ",pivot AS (SELECT * FROM (SELECT `{{ timestamp_column }}`, `{{ value_column }}`, UPPER(`{{ tagname_column }}`) AS `{{ tagname_column }}` FROM project) PIVOT (FIRST(`{{ value_column }}`) FOR `{{ tagname_column }}` IN (" + # "{% for i in range(tag_names | length) %}" + # "'{{ tag_names[i] | upper }}' AS `{{ tag_names[i] }}`{% if not loop.last %}, {% endif %}" + # "{% endfor %}" + # "{% else %}" + # ",pivot AS (SELECT * FROM (SELECT `{{ timestamp_column }}`, `{{ value_column }}`, `{{ tagname_column }}` AS `{{ tagname_column }}` FROM project) PIVOT (FIRST(`{{ value_column }}`) FOR `{{ tagname_column }}` IN (" + # "{% for i in range(tag_names | length) %}" + # "'{{ tag_names[i] }}' AS `{{ tag_names[i] }}`{% if not loop.last %}, {% endif %}" + # "{% endfor %}" + # "{% endif %}" + # '))) SELECT {% if to_json is defined and to_json == true %}to_json(struct(*), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}*{% endif %} FROM pivot ORDER BY `{{ timestamp_column }}` ' + # "{% else %}" + # "{% if display_uom is defined and display_uom == true %}" + # 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM`), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM`{% endif %} FROM project p ' + # "LEFT OUTER JOIN " + # "{% if metadata_source is defined and metadata_source is not none %}" + # "`{{ metadata_source|lower }}` m ON p.`{{ tagname_column }}` = m.`{{ metadata_tagname_column }}` ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " + # "{% else %}" + # "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON p.`{{ tagname_column }}` = m.`{{ tagname_column }}` ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " + # "{% endif %}" + # "{% else%}" + # 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(*), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}*{% endif %} FROM project ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` ' + # "{% endif %}" + # "{% endif %}" + # "{% if limit is defined and limit is not none %}" + # "LIMIT {{ limit }} " + # "{% endif %}" + # "{% if offset is defined and offset is not none %}" + # "OFFSET {{ offset }} " + # "{% endif %}" + # ) + + # interpolate_parameters = sample_parameters.copy() + # interpolate_parameters["interpolation_method"] = parameters_dict[ + # "interpolation_method" + # ] + # if ( + # parameters_dict["interpolation_method"] == "forward_fill" + # or parameters_dict["interpolation_method"] == "backward_fill" + # ): + # interpolate_parameters["interpolation_options_0"] = interpolation_options[0] + # interpolate_parameters["interpolation_options_1"] = interpolation_options[1] + # interpolate_parameters["interpolation_options_2"] = interpolation_options[2] + + # sql_template = Template(interpolate_query) + # return sql_template.render(interpolate_parameters) def _plot_query(parameters_dict: dict) -> tuple: @@ -359,89 +1122,6 @@ def _plot_query(parameters_dict: dict) -> tuple: return sql_query, plot_query, plot_parameters -def _interpolation_query( - parameters_dict: dict, sample_query: str, sample_parameters: dict -) -> str: - if parameters_dict["interpolation_method"] == "forward_fill": - interpolation_methods = "last_value/UNBOUNDED PRECEDING/CURRENT ROW" - - if parameters_dict["interpolation_method"] == "backward_fill": - interpolation_methods = "first_value/CURRENT ROW/UNBOUNDED FOLLOWING" - - if ( - parameters_dict["interpolation_method"] == "forward_fill" - or parameters_dict["interpolation_method"] == "backward_fill" - ): - interpolation_options = interpolation_methods.split("/") - - interpolate_query = ( - f"WITH resample AS ({sample_query})" - "{% if case_insensitivity_tag_search is defined and case_insensitivity_tag_search == true %}" - ',date_array AS (SELECT DISTINCT explode(sequence(from_utc_timestamp(to_timestamp("{{ start_date }}"), "{{ time_zone }}"), from_utc_timestamp(to_timestamp("{{ end_date }}"), "{{ time_zone }}"), INTERVAL \'{{ time_interval_rate + \' \' + time_interval_unit }}\')) AS `{{ timestamp_column }}`, explode(array(`{{ tagname_column }}`)) AS `{{ tagname_column }}` FROM resample) ' - "{% else %}" - ",date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp(\"{{ start_date }}\"), \"{{ time_zone }}\"), from_utc_timestamp(to_timestamp(\"{{ end_date }}\"), \"{{ time_zone }}\"), INTERVAL '{{ time_interval_rate + ' ' + time_interval_unit }}')) AS `{{ timestamp_column }}`, explode(array('{{ tag_names | join('\\', \\'') }}')) AS `{{ tagname_column }}`) " - "{% endif %}" - '{% if (interpolation_method is defined) and (interpolation_method == "forward_fill" or interpolation_method == "backward_fill") %}' - ",project AS (SELECT a.`{{ timestamp_column }}`, a.`{{ tagname_column }}`, {{ interpolation_options_0 }}(b.`{{ value_column }}`, true) OVER (PARTITION BY a.`{{ tagname_column }}` ORDER BY a.`{{ timestamp_column }}` ROWS BETWEEN {{ interpolation_options_1 }} AND {{ interpolation_options_2 }}) AS `{{ value_column }}` FROM date_array a LEFT OUTER JOIN resample b ON a.`{{ timestamp_column }}` = b.`{{ timestamp_column }}` AND a.`{{ tagname_column }}` = b.`{{ tagname_column }}`) " - '{% elif (interpolation_method is defined) and (interpolation_method == "linear") %}' - ",linear_interpolation_calculations AS (SELECT coalesce(a.`{{ tagname_column }}`, b.`{{ tagname_column }}`) AS `{{ tagname_column }}`, coalesce(a.`{{ timestamp_column }}`, b.`{{ timestamp_column }}`) AS `{{ timestamp_column }}`, a.`{{ timestamp_column }}` AS `Requested_{{ timestamp_column }}`, b.`{{ timestamp_column }}` AS `Found_{{ timestamp_column }}`, b.`{{ value_column }}`, " - "last_value(b.`{{ timestamp_column }}`, true) OVER (PARTITION BY a.`{{ tagname_column }}` ORDER BY a.`{{ timestamp_column }}` ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `Last_{{ timestamp_column }}`, last_value(b.`{{ value_column }}`, true) OVER (PARTITION BY a.`{{ tagname_column }}` ORDER BY a.`{{ timestamp_column }}` ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `Last_{{ value_column }}`, " - "first_value(b.`{{ timestamp_column }}`, true) OVER (PARTITION BY a.`{{ tagname_column }}` ORDER BY a.`{{ timestamp_column }}` ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS `Next_{{ timestamp_column }}`, first_value(b.`{{ value_column }}`, true) OVER (PARTITION BY a.`{{ tagname_column }}` ORDER BY a.`{{ timestamp_column }}` ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS `Next_{{ value_column }}`, " - "CASE WHEN b.`{{ value_column }}` is NULL THEN `Last_{{ value_column }}` + (unix_timestamp(a.`{{ timestamp_column }}`) - unix_timestamp(`Last_{{ timestamp_column }}`)) * ((`Next_{{ value_column }}` - `Last_{{ value_column }}`)) / ((unix_timestamp(`Next_{{ timestamp_column }}`) - unix_timestamp(`Last_{{ timestamp_column }}`))) ELSE b.`{{ value_column }}` END AS `linear_interpolated_{{ value_column }}` FROM date_array a FULL OUTER JOIN resample b ON a.`{{ timestamp_column }}` = b.`{{ timestamp_column }}` AND a.`{{ tagname_column }}` = b.`{{ tagname_column }}`) " - ",project AS (SELECT `{{ timestamp_column }}`, `{{ tagname_column }}`, `linear_interpolated_{{ value_column }}` AS `{{ value_column }}` FROM linear_interpolation_calculations) " - "{% else %}" - ",project AS (SELECT * FROM resample) " - "{% endif %}" - "{% if pivot is defined and pivot == true %}" - "{% if case_insensitivity_tag_search is defined and case_insensitivity_tag_search == true %}" - ",pivot AS (SELECT * FROM (SELECT `{{ timestamp_column }}`, `{{ value_column }}`, UPPER(`{{ tagname_column }}`) AS `{{ tagname_column }}` FROM project) PIVOT (FIRST(`{{ value_column }}`) FOR `{{ tagname_column }}` IN (" - "{% for i in range(tag_names | length) %}" - "'{{ tag_names[i] | upper }}' AS `{{ tag_names[i] }}`{% if not loop.last %}, {% endif %}" - "{% endfor %}" - "{% else %}" - ",pivot AS (SELECT * FROM (SELECT `{{ timestamp_column }}`, `{{ value_column }}`, `{{ tagname_column }}` AS `{{ tagname_column }}` FROM project) PIVOT (FIRST(`{{ value_column }}`) FOR `{{ tagname_column }}` IN (" - "{% for i in range(tag_names | length) %}" - "'{{ tag_names[i] }}' AS `{{ tag_names[i] }}`{% if not loop.last %}, {% endif %}" - "{% endfor %}" - "{% endif %}" - '))) SELECT {% if to_json is defined and to_json == true %}to_json(struct(*), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}*{% endif %} FROM pivot ORDER BY `{{ timestamp_column }}` ' - "{% else %}" - "{% if display_uom is defined and display_uom == true %}" - 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM`), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM`{% endif %} FROM project p ' - "LEFT OUTER JOIN " - "{% if metadata_source is defined and metadata_source is not none %}" - "`{{ metadata_source|lower }}` m ON p.`{{ tagname_column }}` = m.`{{ metadata_tagname_column }}` ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " - "{% else %}" - "`{{ business_unit|lower }}`.`sensors`.`{{ asset|lower }}_{{ data_security_level|lower }}_metadata` m ON p.`{{ tagname_column }}` = m.`{{ tagname_column }}` ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` " - "{% endif %}" - "{% else%}" - 'SELECT {% if to_json is defined and to_json == true %}to_json(struct(*), map("timestampFormat", "yyyy-MM-dd\'T\'HH:mm:ss.SSSSSSSSSXXX")) as Value{% else %}*{% endif %} FROM project ORDER BY `{{ tagname_column }}`, `{{ timestamp_column }}` ' - "{% endif %}" - "{% endif %}" - "{% if limit is defined and limit is not none %}" - "LIMIT {{ limit }} " - "{% endif %}" - "{% if offset is defined and offset is not none %}" - "OFFSET {{ offset }} " - "{% endif %}" - ) - - interpolate_parameters = sample_parameters.copy() - interpolate_parameters["interpolation_method"] = parameters_dict[ - "interpolation_method" - ] - if ( - parameters_dict["interpolation_method"] == "forward_fill" - or parameters_dict["interpolation_method"] == "backward_fill" - ): - interpolate_parameters["interpolation_options_0"] = interpolation_options[0] - interpolate_parameters["interpolation_options_1"] = interpolation_options[1] - interpolate_parameters["interpolation_options_2"] = interpolation_options[2] - - sql_template = Template(interpolate_query) - return sql_template.render(interpolate_parameters) - - def _interpolation_at_time(parameters_dict: dict) -> str: timestamps_deduplicated = list( dict.fromkeys(parameters_dict["timestamps"]) @@ -1097,34 +1777,26 @@ def _query_builder(parameters_dict: dict, query_type: str) -> str: + " " + parameters_dict["time_interval_unit"][0] ) - sample_prepared_query, sample_query, sample_parameters = _sample_query( - parameters_dict - ) + sample_prepared_query = _sample_query(parameters_dict) return sample_prepared_query - if query_type == "plot": + if query_type == "interpolate": parameters_dict["range_join_seconds"] = _convert_to_seconds( parameters_dict["time_interval_rate"] + " " + parameters_dict["time_interval_unit"][0] ) - plot_prepared_query, _, _ = _plot_query(parameters_dict) - return plot_prepared_query + interpolate_prepared_query = _interpolation_query(parameters_dict) + return interpolate_prepared_query - if query_type == "interpolate": + if query_type == "plot": parameters_dict["range_join_seconds"] = _convert_to_seconds( parameters_dict["time_interval_rate"] + " " + parameters_dict["time_interval_unit"][0] ) - to_json_flag = parameters_dict.get("to_json", False) - parameters_dict["to_json"] = False - sample_prepared_query, sample_query, sample_parameters = _sample_query( - parameters_dict - ) - sample_parameters["is_resample"] = False - sample_parameters["to_json"] = to_json_flag - return _interpolation_query(parameters_dict, sample_query, sample_parameters) + plot_prepared_query, _, _ = _plot_query(parameters_dict) + return plot_prepared_query if query_type == "time_weighted_average": parameters_dict["range_join_seconds"] = _convert_to_seconds( diff --git a/src/sdk/python/rtdip_sdk/queries/time_series/interpolate.py b/src/sdk/python/rtdip_sdk/queries/time_series/interpolate.py index 354b74a31..0b0e86aab 100644 --- a/src/sdk/python/rtdip_sdk/queries/time_series/interpolate.py +++ b/src/sdk/python/rtdip_sdk/queries/time_series/interpolate.py @@ -49,6 +49,7 @@ def get(connection: object, parameters_dict: dict) -> pd.DataFrame: display_uom (optional bool): Display the unit of measure with True or False. Does not apply to pivoted tables. Defaults to False limit (optional int): The number of rows to be returned offset (optional int): The number of rows to skip before returning rows + sort (optional bool): Sort the data in ascending order by the TagName and Timestamp columns or, if pivot is True, by the Timestamp column case_insensitivity_tag_search (optional bool): Search for tags using case insensitivity with True or case sensitivity with False Returns: diff --git a/src/sdk/python/rtdip_sdk/queries/time_series/raw.py b/src/sdk/python/rtdip_sdk/queries/time_series/raw.py index d29715c9f..7498050b3 100644 --- a/src/sdk/python/rtdip_sdk/queries/time_series/raw.py +++ b/src/sdk/python/rtdip_sdk/queries/time_series/raw.py @@ -44,6 +44,7 @@ def get(connection: object, parameters_dict: dict) -> pd.DataFrame: limit (optional int): The number of rows to be returned offset (optional int): The number of rows to skip before returning rows display_uom (optional bool): Display the unit of measure with True or False. Defaults to False + sort (optional bool): Sort the data in ascending order by the TagName and Timestamp columns case_insensitivity_tag_search (optional bool): Search for tags using case insensitivity with True or case sensitivity with False Returns: diff --git a/src/sdk/python/rtdip_sdk/queries/time_series/resample.py b/src/sdk/python/rtdip_sdk/queries/time_series/resample.py index 1f8f40f13..0b7f5cda7 100644 --- a/src/sdk/python/rtdip_sdk/queries/time_series/resample.py +++ b/src/sdk/python/rtdip_sdk/queries/time_series/resample.py @@ -46,10 +46,12 @@ def get(connection: object, parameters_dict: dict) -> pd.DataFrame: time_interval_unit (str): The time interval unit (second, minute, day, hour) agg_method (str): Aggregation Method (first, last, avg, min, max) include_bad_data (bool): Include "Bad" data points with True or remove "Bad" data points with False + fill (optional bool): Fill the data with intervals where no data exists. The Value column will be filled with Null pivot (optional bool): Pivot the data on timestamp column with True or do not pivot the data with False display_uom (optional bool): Display the unit of measure with True or False. Does not apply to pivoted tables. Defaults to False limit (optional int): The number of rows to be returned offset (optional int): The number of rows to skip before returning rows + sort (optional bool): Sort the data in ascending order by the TagName and Timestamp columns or, if pivot is True, by the Timestamp column case_insensitivity_tag_search (optional bool): Search for tags using case insensitivity with True or case sensitivity with False diff --git a/src/sdk/python/rtdip_sdk/queries/time_series/time_series_query_builder.py b/src/sdk/python/rtdip_sdk/queries/time_series/time_series_query_builder.py index 9109ddaab..a19b740be 100644 --- a/src/sdk/python/rtdip_sdk/queries/time_series/time_series_query_builder.py +++ b/src/sdk/python/rtdip_sdk/queries/time_series/time_series_query_builder.py @@ -174,6 +174,7 @@ def raw( end_date: str, include_bad_data: bool = False, display_uom: bool = False, + sort: bool = True, limit: int = None, offset: int = None, ) -> DataFrame: @@ -211,6 +212,7 @@ def raw( end_date (str): End date (Either a date in the format YY-MM-DD or a datetime in the format YYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz) include_bad_data (optional bool): Include "Bad" data points with True or remove "Bad" data points with False display_uom (optional bool): Display the unit of measure with True or False. Defaults to False. If True, metadata_source must be populated + sort (optional bool): Sort the data in ascending order by the TagName and Timestamp columns limit (optional int): The number of rows to be returned offset (optional int): The number of rows to skip before returning rows @@ -225,6 +227,7 @@ def raw( "end_date": end_date, "include_bad_data": include_bad_data, "display_uom": display_uom, + "sirt": sort, "limit": limit, "offset": offset, "tagname_column": self.tagname_column, @@ -253,8 +256,10 @@ def resample( time_interval_unit: str, agg_method: str, include_bad_data: bool = False, + fill: bool = False, pivot: bool = False, display_uom: bool = False, + sort: bool = True, limit: int = None, offset: int = None, ) -> DataFrame: @@ -297,8 +302,10 @@ def resample( time_interval_unit (str): The time interval unit (second, minute, day, hour) agg_method (str): Aggregation Method (first, last, avg, min, max) include_bad_data (optional bool): Include "Bad" data points with True or remove "Bad" data points with False + fill (bool): Fill the data with intervals where no data exists. The Value column will be filled with Null pivot (optional bool): Pivot the data on the timestamp column with True or do not pivot the data with False display_uom (optional bool): Display the unit of measure with True or False. Defaults to False. If True, metadata_source must be populated + sort (optional bool): Sort the data in ascending order by the TagName and Timestamp columns or, if pivot is True, by the Timestamp column limit (optional int): The number of rows to be returned offset (optional int): The number of rows to skip before returning rows @@ -316,8 +323,10 @@ def resample( "time_interval_rate": time_interval_rate, "time_interval_unit": time_interval_unit, "agg_method": agg_method, + ":fill": fill, "pivot": pivot, "display_uom": display_uom, + "sort": sort, "limit": limit, "offset": offset, "tagname_column": self.tagname_column, @@ -350,6 +359,7 @@ def plot( include_bad_data: bool = False, pivot: bool = False, display_uom: bool = False, + sort: bool = True, limit: int = None, offset: int = None, ) -> DataFrame: @@ -392,6 +402,7 @@ def plot( include_bad_data (optional bool): Include "Bad" data points with True or remove "Bad" data points with False pivot (optional bool): Pivot the data on the timestamp column with True or do not pivot the data with False display_uom (optional bool): Display the unit of measure with True or False. Defaults to False. If True, metadata_source must be populated + sort (optional bool): Sort the data in ascending order by the TagName and Timestamp columns limit (optional int): The number of rows to be returned offset (optional int): The number of rows to skip before returning rows @@ -410,6 +421,7 @@ def plot( "include_bad_data": include_bad_data, "pivot": pivot, "display_uom": display_uom, + "sort": sort, "limit": limit, "offset": offset, "tagname_column": self.tagname_column, @@ -441,6 +453,7 @@ def interpolate( include_bad_data: bool = False, pivot: bool = False, display_uom: bool = False, + sort: bool = True, limit: int = None, offset: int = None, ) -> DataFrame: @@ -487,6 +500,7 @@ def interpolate( include_bad_data (optional bool): Include "Bad" data points with True or remove "Bad" data points with False pivot (optional bool): Pivot the data on the timestamp column with True or do not pivot the data with False display_uom (optional bool): Display the unit of measure with True or False. Defaults to False. If True, metadata_source must be populated + sort (optional bool): Sort the data in ascending order by the TagName and Timestamp columns or, if pivot is True, by the Timestamp column limit (optional int): The number of rows to be returned offset (optional int): The number of rows to skip before returning rows @@ -506,6 +520,7 @@ def interpolate( "interpolation_method": interpolation_method, "pivot": pivot, "display_uom": display_uom, + "sort": sort, "limit": limit, "offset": offset, "tagname_column": self.tagname_column, diff --git a/tests/sdk/python/rtdip_sdk/queries/_test_utils/sdk_test_objects.py b/tests/sdk/python/rtdip_sdk/queries/_test_utils/sdk_test_objects.py index daaae4cd2..27fa00424 100644 --- a/tests/sdk/python/rtdip_sdk/queries/_test_utils/sdk_test_objects.py +++ b/tests/sdk/python/rtdip_sdk/queries/_test_utils/sdk_test_objects.py @@ -33,15 +33,15 @@ MOCKED_QUERY_OFFSET_LIMIT = "LIMIT 10 OFFSET 10 " # Raw -RAW_MOCKED_QUERY = 'WITH raw_events AS (SELECT DISTINCT from_utc_timestamp(date_trunc("millisecond",`EventTime`), "+0000") AS `EventTime`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp("2011-01-01T00:00:00+00:00") AND to_timestamp("2011-01-02T23:59:59+00:00") AND `TagName` IN (\'mocked-TAGNAME\') ORDER BY `TagName`, `EventTime` ) SELECT * FROM raw_events ' -RAW_MOCKED_QUERY_CHECK_TAGS = 'WITH raw_events AS (SELECT DISTINCT from_utc_timestamp(date_trunc("millisecond",`EventTime`), "+0000") AS `EventTime`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp("2011-01-01T00:00:00+00:00") AND to_timestamp("2011-01-02T23:59:59+00:00") AND UPPER(`TagName`) IN (\'MOCKED-TAGNAME\') ORDER BY `TagName`, `EventTime` ) SELECT * FROM raw_events ' -RAW_MOCKED_QUERY_DISPLAY_UOM = 'WITH raw_events AS (SELECT DISTINCT from_utc_timestamp(date_trunc("millisecond",`EventTime`), "+0000") AS `EventTime`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp("2011-01-01T00:00:00+00:00") AND to_timestamp("2011-01-02T23:59:59+00:00") AND `TagName` IN (\'mocked-TAGNAME\') ORDER BY `TagName`, `EventTime` ) SELECT e.`EventTime`, e.`TagName`, e.`Status`, e.`Value`, m.`UOM` FROM raw_events e LEFT OUTER JOIN `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_metadata` m ON e.`TagName` = m.`TagName` ' +RAW_MOCKED_QUERY = "WITH raw AS (SELECT DISTINCT from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000') AS `EventTime`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp('2011-01-01T00:00:00+00:00') AND to_timestamp('2011-01-02T23:59:59+00:00') AND `TagName` IN ('mocked-TAGNAME') ORDER BY `TagName`, `EventTime`) SELECT * FROM raw" +RAW_MOCKED_QUERY_CHECK_TAGS = "WITH raw AS (SELECT DISTINCT from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000') AS `EventTime`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp('2011-01-01T00:00:00+00:00') AND to_timestamp('2011-01-02T23:59:59+00:00') AND UPPER(`TagName`) IN ('MOCKED-TAGNAME') ORDER BY `TagName`, `EventTime`) SELECT * FROM raw" +RAW_MOCKED_QUERY_DISPLAY_UOM = "WITH raw AS (SELECT DISTINCT from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000') AS `EventTime`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp('2011-01-01T00:00:00+00:00') AND to_timestamp('2011-01-02T23:59:59+00:00') AND `TagName` IN ('mocked-TAGNAME') ORDER BY `TagName`, `EventTime`), uom AS (SELECT raw.*, metadata.`UoM` FROM raw LEFT OUTER JOIN `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_metadata` AS metadata ON raw.`TagName` = metadata.`TagName`) SELECT * FROM uom" # Resample -RESAMPLE_MOCKED_QUERY = 'WITH raw_events AS (SELECT DISTINCT from_utc_timestamp(date_trunc("millisecond",`EventTime`), "+0000") AS `EventTime`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp("2011-01-01T00:00:00+00:00") AND to_timestamp("2011-01-02T23:59:59+00:00") AND `TagName` IN (\'mocked-TAGNAME\') ) ,date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS timestamp_array) ,window_buckets AS (SELECT timestamp_array AS window_start, timestampadd(minute, 15, timestamp_array) AS window_end FROM date_array) ,resample AS (SELECT /*+ RANGE_JOIN(d, 900 ) */ d.window_start, d.window_end, e.`TagName`, avg(e.`Value`) OVER (PARTITION BY e.`TagName`, d.window_start ORDER BY e.`EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `Value` FROM window_buckets d INNER JOIN raw_events e ON d.window_start <= e.`EventTime` AND d.window_end > e.`EventTime`) ,project AS (SELECT window_start AS `EventTime`, `TagName`, `Value` FROM resample GROUP BY window_start, `TagName`, `Value` ORDER BY `TagName`, `EventTime` ) SELECT * FROM project ' -RESAMPLE_MOCKED_QUERY_CHECK_TAGS = 'WITH raw_events AS (SELECT DISTINCT from_utc_timestamp(date_trunc("millisecond",`EventTime`), "+0000") AS `EventTime`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp("2011-01-01T00:00:00+00:00") AND to_timestamp("2011-01-02T23:59:59+00:00") AND UPPER(`TagName`) IN (\'MOCKED-TAGNAME\') ) ,date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS timestamp_array) ,window_buckets AS (SELECT timestamp_array AS window_start, timestampadd(minute, 15, timestamp_array) AS window_end FROM date_array) ,resample AS (SELECT /*+ RANGE_JOIN(d, 900 ) */ d.window_start, d.window_end, e.`TagName`, avg(e.`Value`) OVER (PARTITION BY e.`TagName`, d.window_start ORDER BY e.`EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `Value` FROM window_buckets d INNER JOIN raw_events e ON d.window_start <= e.`EventTime` AND d.window_end > e.`EventTime`) ,project AS (SELECT window_start AS `EventTime`, `TagName`, `Value` FROM resample GROUP BY window_start, `TagName`, `Value` ORDER BY `TagName`, `EventTime` ) SELECT * FROM project ' -RESAMPLE_MOCKED_QUERY_PIVOT = 'WITH raw_events AS (SELECT DISTINCT from_utc_timestamp(date_trunc("millisecond",`EventTime`), "+0000") AS `EventTime`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp("2011-01-01T00:00:00+00:00") AND to_timestamp("2011-01-02T23:59:59+00:00") AND `TagName` IN (\'mocked-TAGNAME\') ) ,date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS timestamp_array) ,window_buckets AS (SELECT timestamp_array AS window_start, timestampadd(minute, 15, timestamp_array) AS window_end FROM date_array) ,resample AS (SELECT /*+ RANGE_JOIN(d, 900 ) */ d.window_start, d.window_end, e.`TagName`, avg(e.`Value`) OVER (PARTITION BY e.`TagName`, d.window_start ORDER BY e.`EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `Value` FROM window_buckets d INNER JOIN raw_events e ON d.window_start <= e.`EventTime` AND d.window_end > e.`EventTime`) ,project AS (SELECT window_start AS `EventTime`, `TagName`, `Value` FROM resample GROUP BY window_start, `TagName`, `Value` ORDER BY `TagName`, `EventTime` ) ,pivot AS (SELECT * FROM (SELECT `EventTime`, `Value`, `TagName` AS `TagName` FROM project) PIVOT (FIRST(`Value`) FOR `TagName` IN (\'mocked-TAGNAME\' AS `mocked-TAGNAME`))) SELECT * FROM pivot ORDER BY `EventTime` ' -RESAMPLE_MOCKED_QUERY_UOM = 'WITH raw_events AS (SELECT DISTINCT from_utc_timestamp(date_trunc("millisecond",`EventTime`), "+0000") AS `EventTime`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp("2011-01-01T00:00:00+00:00") AND to_timestamp("2011-01-02T23:59:59+00:00") AND `TagName` IN (\'mocked-TAGNAME\') ) ,date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS timestamp_array) ,window_buckets AS (SELECT timestamp_array AS window_start, timestampadd(minute, 15, timestamp_array) AS window_end FROM date_array) ,resample AS (SELECT /*+ RANGE_JOIN(d, 900 ) */ d.window_start, d.window_end, e.`TagName`, avg(e.`Value`) OVER (PARTITION BY e.`TagName`, d.window_start ORDER BY e.`EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `Value` FROM window_buckets d INNER JOIN raw_events e ON d.window_start <= e.`EventTime` AND d.window_end > e.`EventTime`) ,project AS (SELECT window_start AS `EventTime`, `TagName`, `Value` FROM resample GROUP BY window_start, `TagName`, `Value` ORDER BY `TagName`, `EventTime` ) SELECT p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM` FROM project p LEFT OUTER JOIN `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_metadata` m ON p.`TagName` = m.`TagName` ' +RESAMPLE_MOCKED_QUERY = "WITH raw AS (SELECT DISTINCT from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000') AS `EventTime`, window(from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000'), '15 minute', '15 minute', '0 second') AS `window`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp('2011-01-01T00:00:00+00:00') AND timestampadd(minute, 15, to_timestamp('2011-01-02T23:59:59+00:00')) AND `TagName` IN ('mocked-TAGNAME')), resample AS (SELECT raw.`TagName`, raw.`window`.start AS `EventTime`, avg(raw.`Value`) AS `Value` FROM raw GROUP BY raw.`TagName`, raw.`window`.start ORDER BY `TagName`, `EventTime`) SELECT * FROM resample" +RESAMPLE_MOCKED_QUERY_CHECK_TAGS = "WITH raw AS (SELECT DISTINCT from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000') AS `EventTime`, window(from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000'), '15 minute', '15 minute', '0 second') AS `window`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp('2011-01-01T00:00:00+00:00') AND timestampadd(minute, 15, to_timestamp('2011-01-02T23:59:59+00:00')) AND UPPER(`TagName`) IN ('MOCKED-TAGNAME')), resample AS (SELECT raw.`TagName`, raw.`window`.start AS `EventTime`, avg(raw.`Value`) AS `Value` FROM raw GROUP BY raw.`TagName`, raw.`window`.start ORDER BY `TagName`, `EventTime`) SELECT * FROM resample" +RESAMPLE_MOCKED_QUERY_PIVOT = "WITH raw AS (SELECT DISTINCT from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000') AS `EventTime`, window(from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000'), '15 minute', '15 minute', '0 second') AS `window`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp('2011-01-01T00:00:00+00:00') AND timestampadd(minute, 15, to_timestamp('2011-01-02T23:59:59+00:00')) AND `TagName` IN ('mocked-TAGNAME')), resample AS (SELECT raw.`TagName`, raw.`window`.start AS `EventTime`, avg(raw.`Value`) AS `Value` FROM raw GROUP BY raw.`TagName`, raw.`window`.start), pivot AS (SELECT * FROM (SELECT `EventTime`, `Value`, `TagName` FROM resample) PIVOT (FIRST(`Value`) FOR `TagName` IN ('mocked-TAGNAME' AS `mocked-TAGNAME`)) ORDER BY `EventTime`) SELECT * FROM pivot" +RESAMPLE_MOCKED_QUERY_UOM = "WITH raw AS (SELECT DISTINCT from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000') AS `EventTime`, window(from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000'), '15 minute', '15 minute', '0 second') AS `window`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp('2011-01-01T00:00:00+00:00') AND timestampadd(minute, 15, to_timestamp('2011-01-02T23:59:59+00:00')) AND `TagName` IN ('mocked-TAGNAME')), resample AS (SELECT raw.`TagName`, raw.`window`.start AS `EventTime`, avg(raw.`Value`) AS `Value` FROM raw GROUP BY raw.`TagName`, raw.`window`.start ORDER BY `TagName`, `EventTime`), uom AS (SELECT resample.*, metadata.`UoM` FROM resample LEFT OUTER JOIN `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_metadata` AS metadata ON resample.`TagName` = metadata.`TagName`) SELECT * FROM uom" # Plot PLOT_MOCKED_QUERY = 'WITH raw_events AS (SELECT DISTINCT from_utc_timestamp(date_trunc("millisecond",`EventTime`), "+0000") AS `EventTime`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp("2011-01-01T00:00:00+00:00") AND to_timestamp("2011-01-02T23:59:59+00:00") AND `TagName` IN (\'mocked-TAGNAME\') ) ,date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS timestamp_array) ,window_buckets AS (SELECT timestamp_array AS window_start, timestampadd(minute, 15, timestamp_array) AS window_end FROM date_array) ,plot AS (SELECT /*+ RANGE_JOIN(d, 900 ) */ d.window_start, d.window_end, e.`TagName`, min(CASE WHEN `Status` = \'Bad\' THEN null ELSE struct(e.`Value`, e.`EventTime`) END) OVER (PARTITION BY e.`TagName`, d.window_start ORDER BY e.`EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `min_Value`, max(CASE WHEN `Status` = \'Bad\' THEN null ELSE struct(e.`Value`, e.`EventTime`) END) OVER (PARTITION BY e.`TagName`, d.window_start ORDER BY e.`EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `max_Value`, first(CASE WHEN `Status` = \'Bad\' THEN null ELSE struct(e.`Value`, e.`EventTime`) END, True) OVER (PARTITION BY e.`TagName`, d.window_start ORDER BY e.`EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `first_Value`, last(CASE WHEN `Status` = \'Bad\' THEN null ELSE struct(e.`Value`, e.`EventTime`) END, True) OVER (PARTITION BY e.`TagName`, d.window_start ORDER BY e.`EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `last_Value`, first(CASE WHEN `Status` = \'Bad\' THEN struct(e.`Value`, e.`EventTime`) ELSE null END, True) OVER (PARTITION BY e.`TagName`, d.window_start ORDER BY e.`EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `excp_Value` FROM window_buckets d INNER JOIN raw_events e ON d.window_start <= e.`EventTime` AND d.window_end > e.`EventTime`) ,deduplicate AS (SELECT window_start AS `EventTime`, `TagName`, `min_Value` as `Min`, `max_Value` as `Max`, `first_Value` as `First`, `last_Value` as `Last`, `excp_Value` as `Exception` FROM plot GROUP BY window_start, `TagName`, `min_Value`, `max_Value`, `first_Value`, `last_Value`, `excp_Value`) ,project AS (SELECT distinct Values.EventTime, `TagName`, Values.Value FROM (SELECT * FROM deduplicate UNPIVOT (`Values` for `Aggregation` IN (`Min`, `Max`, `First`, `Last`, `Exception`))) ORDER BY `TagName`, `EventTime` ) SELECT * FROM project ' @@ -50,11 +50,11 @@ PLOT_MOCKED_QUERY_UOM = 'WITH raw_events AS (SELECT DISTINCT from_utc_timestamp(date_trunc("millisecond",`EventTime`), "+0000") AS `EventTime`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp("2011-01-01T00:00:00+00:00") AND to_timestamp("2011-01-02T23:59:59+00:00") AND `TagName` IN (\'mocked-TAGNAME\') ) ,date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS timestamp_array) ,window_buckets AS (SELECT timestamp_array AS window_start, timestampadd(minute, 15, timestamp_array) AS window_end FROM date_array) ,plot AS (SELECT /*+ RANGE_JOIN(d, 900 ) */ d.window_start, d.window_end, e.`TagName`, min(CASE WHEN `Status` = \'Bad\' THEN null ELSE struct(e.`Value`, e.`EventTime`) END) OVER (PARTITION BY e.`TagName`, d.window_start ORDER BY e.`EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `min_Value`, max(CASE WHEN `Status` = \'Bad\' THEN null ELSE struct(e.`Value`, e.`EventTime`) END) OVER (PARTITION BY e.`TagName`, d.window_start ORDER BY e.`EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `max_Value`, first(CASE WHEN `Status` = \'Bad\' THEN null ELSE struct(e.`Value`, e.`EventTime`) END, True) OVER (PARTITION BY e.`TagName`, d.window_start ORDER BY e.`EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `first_Value`, last(CASE WHEN `Status` = \'Bad\' THEN null ELSE struct(e.`Value`, e.`EventTime`) END, True) OVER (PARTITION BY e.`TagName`, d.window_start ORDER BY e.`EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `last_Value`, first(CASE WHEN `Status` = \'Bad\' THEN struct(e.`Value`, e.`EventTime`) ELSE null END, True) OVER (PARTITION BY e.`TagName`, d.window_start ORDER BY e.`EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `excp_Value` FROM window_buckets d INNER JOIN raw_events e ON d.window_start <= e.`EventTime` AND d.window_end > e.`EventTime`) ,deduplicate AS (SELECT window_start AS `EventTime`, `TagName`, `min_Value` as `Min`, `max_Value` as `Max`, `first_Value` as `First`, `last_Value` as `Last`, `excp_Value` as `Exception` FROM plot GROUP BY window_start, `TagName`, `min_Value`, `max_Value`, `first_Value`, `last_Value`, `excp_Value`) ,project AS (SELECT distinct Values.EventTime, `TagName`, Values.Value FROM (SELECT * FROM deduplicate UNPIVOT (`Values` for `Aggregation` IN (`Min`, `Max`, `First`, `Last`, `Exception`))) ORDER BY `TagName`, `EventTime` ) SELECT p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM` FROM project p LEFT OUTER JOIN `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_metadata` m ON p.`TagName` = m.`TagName` ' # Interpolate -INTERPOLATE_MOCKED_QUERY = 'WITH resample AS (WITH raw_events AS (SELECT DISTINCT from_utc_timestamp(date_trunc("millisecond",`EventTime`), "+0000") AS `EventTime`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp("2011-01-01T00:00:00+00:00") AND to_timestamp("2011-01-02T23:59:59+00:00") AND `TagName` IN (\'mocked-TAGNAME\') ) ,date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS timestamp_array) ,window_buckets AS (SELECT timestamp_array AS window_start, timestampadd(minute, 15, timestamp_array) AS window_end FROM date_array) ,resample AS (SELECT /*+ RANGE_JOIN(d, 900 ) */ d.window_start, d.window_end, e.`TagName`, avg(e.`Value`) OVER (PARTITION BY e.`TagName`, d.window_start ORDER BY e.`EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `Value` FROM window_buckets d INNER JOIN raw_events e ON d.window_start <= e.`EventTime` AND d.window_end > e.`EventTime`) ,project AS (SELECT window_start AS `EventTime`, `TagName`, `Value` FROM resample GROUP BY window_start, `TagName`, `Value` ) SELECT * FROM project ),date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS `EventTime`, explode(array(\'mocked-TAGNAME\')) AS `TagName`) ,project AS (SELECT a.`EventTime`, a.`TagName`, last_value(b.`Value`, true) OVER (PARTITION BY a.`TagName` ORDER BY a.`EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `Value` FROM date_array a LEFT OUTER JOIN resample b ON a.`EventTime` = b.`EventTime` AND a.`TagName` = b.`TagName`) SELECT * FROM project ORDER BY `TagName`, `EventTime` ' -INTERPOLATE_MOCKED_QUERY_BACKWARD_FILL = 'WITH resample AS (WITH raw_events AS (SELECT DISTINCT from_utc_timestamp(date_trunc("millisecond",`EventTime`), "+0000") AS `EventTime`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp("2011-01-01T00:00:00+00:00") AND to_timestamp("2011-01-02T23:59:59+00:00") AND `TagName` IN (\'mocked-TAGNAME\') ) ,date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS timestamp_array) ,window_buckets AS (SELECT timestamp_array AS window_start, timestampadd(minute, 15, timestamp_array) AS window_end FROM date_array) ,resample AS (SELECT /*+ RANGE_JOIN(d, 900 ) */ d.window_start, d.window_end, e.`TagName`, avg(e.`Value`) OVER (PARTITION BY e.`TagName`, d.window_start ORDER BY e.`EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `Value` FROM window_buckets d INNER JOIN raw_events e ON d.window_start <= e.`EventTime` AND d.window_end > e.`EventTime`) ,project AS (SELECT window_start AS `EventTime`, `TagName`, `Value` FROM resample GROUP BY window_start, `TagName`, `Value` ) SELECT * FROM project ),date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS `EventTime`, explode(array(\'mocked-TAGNAME\')) AS `TagName`) ,project AS (SELECT a.`EventTime`, a.`TagName`, first_value(b.`Value`, true) OVER (PARTITION BY a.`TagName` ORDER BY a.`EventTime` ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS `Value` FROM date_array a LEFT OUTER JOIN resample b ON a.`EventTime` = b.`EventTime` AND a.`TagName` = b.`TagName`) SELECT * FROM project ORDER BY `TagName`, `EventTime` ' -INTERPOLATE_MOCKED_QUERY_CHECK_TAGS = 'WITH resample AS (WITH raw_events AS (SELECT DISTINCT from_utc_timestamp(date_trunc("millisecond",`EventTime`), "+0000") AS `EventTime`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp("2011-01-01T00:00:00+00:00") AND to_timestamp("2011-01-02T23:59:59+00:00") AND UPPER(`TagName`) IN (\'MOCKED-TAGNAME\') ) ,date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS timestamp_array) ,window_buckets AS (SELECT timestamp_array AS window_start, timestampadd(minute, 15, timestamp_array) AS window_end FROM date_array) ,resample AS (SELECT /*+ RANGE_JOIN(d, 900 ) */ d.window_start, d.window_end, e.`TagName`, avg(e.`Value`) OVER (PARTITION BY e.`TagName`, d.window_start ORDER BY e.`EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `Value` FROM window_buckets d INNER JOIN raw_events e ON d.window_start <= e.`EventTime` AND d.window_end > e.`EventTime`) ,project AS (SELECT window_start AS `EventTime`, `TagName`, `Value` FROM resample GROUP BY window_start, `TagName`, `Value` ) SELECT * FROM project ),date_array AS (SELECT DISTINCT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS `EventTime`, explode(array(`TagName`)) AS `TagName` FROM resample) ,project AS (SELECT a.`EventTime`, a.`TagName`, first_value(b.`Value`, true) OVER (PARTITION BY a.`TagName` ORDER BY a.`EventTime` ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS `Value` FROM date_array a LEFT OUTER JOIN resample b ON a.`EventTime` = b.`EventTime` AND a.`TagName` = b.`TagName`) SELECT * FROM project ORDER BY `TagName`, `EventTime` ' -INTERPOLATE_MOCKED_QUERY_PIVOT = 'WITH resample AS (WITH raw_events AS (SELECT DISTINCT from_utc_timestamp(date_trunc("millisecond",`EventTime`), "+0000") AS `EventTime`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp("2011-01-01T00:00:00+00:00") AND to_timestamp("2011-01-02T23:59:59+00:00") AND `TagName` IN (\'mocked-TAGNAME\') ) ,date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS timestamp_array) ,window_buckets AS (SELECT timestamp_array AS window_start, timestampadd(minute, 15, timestamp_array) AS window_end FROM date_array) ,resample AS (SELECT /*+ RANGE_JOIN(d, 900 ) */ d.window_start, d.window_end, e.`TagName`, avg(e.`Value`) OVER (PARTITION BY e.`TagName`, d.window_start ORDER BY e.`EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `Value` FROM window_buckets d INNER JOIN raw_events e ON d.window_start <= e.`EventTime` AND d.window_end > e.`EventTime`) ,project AS (SELECT window_start AS `EventTime`, `TagName`, `Value` FROM resample GROUP BY window_start, `TagName`, `Value` ) SELECT * FROM project ),date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS `EventTime`, explode(array(\'mocked-TAGNAME\')) AS `TagName`) ,project AS (SELECT a.`EventTime`, a.`TagName`, first_value(b.`Value`, true) OVER (PARTITION BY a.`TagName` ORDER BY a.`EventTime` ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS `Value` FROM date_array a LEFT OUTER JOIN resample b ON a.`EventTime` = b.`EventTime` AND a.`TagName` = b.`TagName`) ,pivot AS (SELECT * FROM (SELECT `EventTime`, `Value`, `TagName` AS `TagName` FROM project) PIVOT (FIRST(`Value`) FOR `TagName` IN (\'mocked-TAGNAME\' AS `mocked-TAGNAME`))) SELECT * FROM pivot ORDER BY `EventTime` ' -INTERPOLATE_MOCKED_QUERY_UOM = 'WITH resample AS (WITH raw_events AS (SELECT DISTINCT from_utc_timestamp(date_trunc("millisecond",`EventTime`), "+0000") AS `EventTime`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp("2011-01-01T00:00:00+00:00") AND to_timestamp("2011-01-02T23:59:59+00:00") AND `TagName` IN (\'mocked-TAGNAME\') ) ,date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS timestamp_array) ,window_buckets AS (SELECT timestamp_array AS window_start, timestampadd(minute, 15, timestamp_array) AS window_end FROM date_array) ,resample AS (SELECT /*+ RANGE_JOIN(d, 900 ) */ d.window_start, d.window_end, e.`TagName`, avg(e.`Value`) OVER (PARTITION BY e.`TagName`, d.window_start ORDER BY e.`EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `Value` FROM window_buckets d INNER JOIN raw_events e ON d.window_start <= e.`EventTime` AND d.window_end > e.`EventTime`) ,project AS (SELECT window_start AS `EventTime`, `TagName`, `Value` FROM resample GROUP BY window_start, `TagName`, `Value` ) SELECT p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM` FROM project p LEFT OUTER JOIN `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_metadata` m ON p.`TagName` = m.`TagName` ),date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS `EventTime`, explode(array(\'mocked-TAGNAME\')) AS `TagName`) ,project AS (SELECT a.`EventTime`, a.`TagName`, first_value(b.`Value`, true) OVER (PARTITION BY a.`TagName` ORDER BY a.`EventTime` ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS `Value` FROM date_array a LEFT OUTER JOIN resample b ON a.`EventTime` = b.`EventTime` AND a.`TagName` = b.`TagName`) SELECT p.`EventTime`, p.`TagName`, p.`Value`, m.`UoM` FROM project p LEFT OUTER JOIN `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_metadata` m ON p.`TagName` = m.`TagName` ORDER BY `TagName`, `EventTime` ' +INTERPOLATE_MOCKED_QUERY = "WITH raw AS (SELECT DISTINCT from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000') AS `EventTime`, window(from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000'), '15 minute', '15 minute', '0 second') AS `window`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp('2011-01-01T00:00:00+00:00') AND timestampadd(minute, 15, to_timestamp('2011-01-02T23:59:59+00:00')) AND `TagName` IN ('mocked-TAGNAME')), fill_intervals AS (SELECT DISTINCT explode(sequence(from_utc_timestamp(to_timestamp('2011-01-01T00:00:00+00:00'), '+0000'), from_utc_timestamp(to_timestamp('2011-01-02T23:59:59+00:00'), '+0000'), INTERVAL '15 minute')) AS `EventTime`, explode(array('mocked-TAGNAME')) AS `TagName`), resample AS (SELECT fill_intervals.`TagName`, fill_intervals.`EventTime` AS `EventTime`, avg(raw.`Value`) AS `Value` FROM fill_intervals LEFT OUTER JOIN raw ON fill_intervals.`EventTime` = raw.`window`.start AND fill_intervals.`TagName` = raw.`TagName` GROUP BY fill_intervals.`TagName`, fill_intervals.`EventTime`), interpolate AS (SELECT `EventTime`, `TagName`, last_value(`Value`, true) OVER (PARTITION BY `TagName` ORDER BY `EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `Value` FROM resample ORDER BY `TagName`, `EventTime`) SELECT * FROM interpolate" +INTERPOLATE_MOCKED_QUERY_BACKWARD_FILL = "WITH raw AS (SELECT DISTINCT from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000') AS `EventTime`, window(from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000'), '15 minute', '15 minute', '0 second') AS `window`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp('2011-01-01T00:00:00+00:00') AND timestampadd(minute, 15, to_timestamp('2011-01-02T23:59:59+00:00')) AND `TagName` IN ('mocked-TAGNAME')), fill_intervals AS (SELECT DISTINCT explode(sequence(from_utc_timestamp(to_timestamp('2011-01-01T00:00:00+00:00'), '+0000'), from_utc_timestamp(to_timestamp('2011-01-02T23:59:59+00:00'), '+0000'), INTERVAL '15 minute')) AS `EventTime`, explode(array('mocked-TAGNAME')) AS `TagName`), resample AS (SELECT fill_intervals.`TagName`, fill_intervals.`EventTime` AS `EventTime`, avg(raw.`Value`) AS `Value` FROM fill_intervals LEFT OUTER JOIN raw ON fill_intervals.`EventTime` = raw.`window`.start AND fill_intervals.`TagName` = raw.`TagName` GROUP BY fill_intervals.`TagName`, fill_intervals.`EventTime`), interpolate AS (SELECT `EventTime`, `TagName`, first_value(`Value`, true) OVER (PARTITION BY `TagName` ORDER BY `EventTime` ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS `Value` FROM resample ORDER BY `TagName`, `EventTime`) SELECT * FROM interpolate" +INTERPOLATE_MOCKED_QUERY_CHECK_TAGS = "WITH raw AS (SELECT DISTINCT from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000') AS `EventTime`, window(from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000'), '15 minute', '15 minute', '0 second') AS `window`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp('2011-01-01T00:00:00+00:00') AND timestampadd(minute, 15, to_timestamp('2011-01-02T23:59:59+00:00')) AND UPPER(`TagName`) IN ('MOCKED-TAGNAME')), fill_intervals AS (SELECT DISTINCT explode(sequence(from_utc_timestamp(to_timestamp('2011-01-01T00:00:00+00:00'), '+0000'), from_utc_timestamp(to_timestamp('2011-01-02T23:59:59+00:00'), '+0000'), INTERVAL '15 minute')) AS `EventTime`, explode(array('MOCKED-TAGNAME')) AS `TagName`), resample AS (SELECT fill_intervals.`TagName`, fill_intervals.`EventTime` AS `EventTime`, avg(raw.`Value`) AS `Value` FROM fill_intervals LEFT OUTER JOIN raw ON fill_intervals.`EventTime` = raw.`window`.start AND fill_intervals.`TagName` = raw.`TagName` GROUP BY fill_intervals.`TagName`, fill_intervals.`EventTime`), interpolate AS (SELECT `EventTime`, `TagName`, first_value(`Value`, true) OVER (PARTITION BY `TagName` ORDER BY `EventTime` ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS `Value` FROM resample ORDER BY `TagName`, `EventTime`) SELECT * FROM interpolate" +INTERPOLATE_MOCKED_QUERY_PIVOT = "WITH raw AS (SELECT DISTINCT from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000') AS `EventTime`, window(from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000'), '15 minute', '15 minute', '0 second') AS `window`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp('2011-01-01T00:00:00+00:00') AND timestampadd(minute, 15, to_timestamp('2011-01-02T23:59:59+00:00')) AND `TagName` IN ('mocked-TAGNAME')), fill_intervals AS (SELECT DISTINCT explode(sequence(from_utc_timestamp(to_timestamp('2011-01-01T00:00:00+00:00'), '+0000'), from_utc_timestamp(to_timestamp('2011-01-02T23:59:59+00:00'), '+0000'), INTERVAL '15 minute')) AS `EventTime`, explode(array('mocked-TAGNAME')) AS `TagName`), resample AS (SELECT fill_intervals.`TagName`, fill_intervals.`EventTime` AS `EventTime`, avg(raw.`Value`) AS `Value` FROM fill_intervals LEFT OUTER JOIN raw ON fill_intervals.`EventTime` = raw.`window`.start AND fill_intervals.`TagName` = raw.`TagName` GROUP BY fill_intervals.`TagName`, fill_intervals.`EventTime`), interpolate AS (SELECT `EventTime`, `TagName`, first_value(`Value`, true) OVER (PARTITION BY `TagName` ORDER BY `EventTime` ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS `Value` FROM resample), pivot AS (SELECT * FROM (SELECT `EventTime`, `Value`, `TagName` FROM interpolate) PIVOT (FIRST(`Value`) FOR `TagName` IN ('mocked-TAGNAME' AS `mocked-TAGNAME`)) ORDER BY `EventTime`) SELECT * FROM pivot" +INTERPOLATE_MOCKED_QUERY_UOM = "WITH raw AS (SELECT DISTINCT from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000') AS `EventTime`, window(from_utc_timestamp(date_trunc('millisecond',`EventTime`), '+0000'), '15 minute', '15 minute', '0 second') AS `window`, `TagName`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE `EventTime` BETWEEN to_timestamp('2011-01-01T00:00:00+00:00') AND timestampadd(minute, 15, to_timestamp('2011-01-02T23:59:59+00:00')) AND `TagName` IN ('mocked-TAGNAME')), fill_intervals AS (SELECT DISTINCT explode(sequence(from_utc_timestamp(to_timestamp('2011-01-01T00:00:00+00:00'), '+0000'), from_utc_timestamp(to_timestamp('2011-01-02T23:59:59+00:00'), '+0000'), INTERVAL '15 minute')) AS `EventTime`, explode(array('mocked-TAGNAME')) AS `TagName`), resample AS (SELECT fill_intervals.`TagName`, fill_intervals.`EventTime` AS `EventTime`, avg(raw.`Value`) AS `Value` FROM fill_intervals LEFT OUTER JOIN raw ON fill_intervals.`EventTime` = raw.`window`.start AND fill_intervals.`TagName` = raw.`TagName` GROUP BY fill_intervals.`TagName`, fill_intervals.`EventTime`), interpolate AS (SELECT `EventTime`, `TagName`, first_value(`Value`, true) OVER (PARTITION BY `TagName` ORDER BY `EventTime` ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS `Value` FROM resample ORDER BY `TagName`, `EventTime`), uom AS (SELECT interpolate.*, metadata.`UoM` FROM interpolate LEFT OUTER JOIN `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_metadata` AS metadata ON interpolate.`TagName` = metadata.`TagName`) SELECT * FROM uom" # Time Weighted Average TWA_MOCKED_QUERY = 'WITH raw_events AS (SELECT DISTINCT `TagName`, from_utc_timestamp(date_trunc("millisecond",`EventTime`), "+0000") AS `EventTime`, `Status`, `Value` FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE to_date(`EventTime`) BETWEEN date_sub(to_date(to_timestamp("2011-01-01T00:00:00+00:00")), 1) AND date_add(to_date(to_timestamp("2011-01-02T23:59:59+00:00")), 1) AND `TagName` IN (\'mocked-TAGNAME\') ) ,date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS `EventTime`, explode(array(\'mocked-TAGNAME\')) AS `TagName`) ,boundary_events AS (SELECT coalesce(a.`TagName`, b.`TagName`) AS `TagName`, coalesce(a.`EventTime`, b.`EventTime`) AS `EventTime`, b.`Status`, b.`Value` FROM date_array a FULL OUTER JOIN raw_events b ON a.`EventTime` = b.`EventTime` AND a.`TagName` = b.`TagName`) ,window_buckets AS (SELECT `EventTime` AS window_start, LEAD(`EventTime`) OVER (ORDER BY `EventTime`) AS window_end FROM (SELECT distinct `EventTime` FROM date_array) ) ,window_events AS (SELECT /*+ RANGE_JOIN(b, 900 ) */ b.`TagName`, b.`EventTime`, a.window_start AS `WindowEventTime`, b.`Status`, b.`Value` FROM boundary_events b LEFT OUTER JOIN window_buckets a ON a.window_start <= b.`EventTime` AND a.window_end > b.`EventTime`) ,fill_status AS (SELECT *, last_value(`Status`, true) OVER (PARTITION BY `TagName` ORDER BY `EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `Fill_Status`, CASE WHEN `Fill_Status` <> "Bad" THEN `Value` ELSE null END AS `Good_Value` FROM window_events) ,fill_value AS (SELECT *, last_value(`Good_Value`, true) OVER (PARTITION BY `TagName` ORDER BY `EventTime` ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `Fill_Value` FROM fill_status) ,fill_step AS (SELECT *, false AS Step FROM fill_value) ,interpolate AS (SELECT *, CASE WHEN `Step` = false AND `Status` IS NULL AND `Value` IS NULL THEN lag(`EventTime`) OVER ( PARTITION BY `TagName` ORDER BY `EventTime` ) ELSE NULL END AS `Previous_EventTime`, CASE WHEN `Step` = false AND `Status` IS NULL AND `Value` IS NULL THEN lag(`Fill_Value`) OVER ( PARTITION BY `TagName` ORDER BY `EventTime` ) ELSE NULL END AS `Previous_Fill_Value`, lead(`EventTime`) OVER ( PARTITION BY `TagName` ORDER BY `EventTime` ) AS `Next_EventTime`, CASE WHEN `Step` = false AND `Status` IS NULL AND `Value` IS NULL THEN lead(`Fill_Value`) OVER ( PARTITION BY `TagName` ORDER BY `EventTime` ) ELSE NULL END AS `Next_Fill_Value`, CASE WHEN `Step` = false AND `Status` IS NULL AND `Value` IS NULL THEN `Previous_Fill_Value` + ( (`Next_Fill_Value` - `Previous_Fill_Value`) * ( ( unix_timestamp(`EventTime`) - unix_timestamp(`Previous_EventTime`) ) / ( unix_timestamp(`Next_EventTime`) - unix_timestamp(`Previous_EventTime`) ) ) ) ELSE NULL END AS `Interpolated_Value`, coalesce(`Interpolated_Value`, `Fill_Value`) as `Event_Value` FROM fill_step ),twa_calculations AS (SELECT `TagName`, `EventTime`, `WindowEventTime`, `Step`, `Status`, `Value`, `Previous_EventTime`, `Previous_Fill_Value`, `Next_EventTime`, `Next_Fill_Value`, `Interpolated_Value`, `Fill_Status`, `Fill_Value`, `Event_Value`, lead(`Fill_Status`) OVER (PARTITION BY `TagName` ORDER BY `EventTime`) AS `Next_Status` , CASE WHEN `Next_Status` <> "Bad" OR (`Fill_Status` <> "Bad" AND `Next_Status` = "Bad") THEN lead(`Event_Value`) OVER (PARTITION BY `TagName` ORDER BY `EventTime`) ELSE `Value` END AS `Next_Value_For_Status` , CASE WHEN `Fill_Status` <> "Bad" THEN `Next_Value_For_Status` ELSE 0 END AS `Next_Value` , CASE WHEN `Fill_Status` <> "Bad" AND `Next_Status` <> "Bad" THEN ((cast(`Next_EventTime` AS double) - cast(`EventTime` AS double)) / 60) WHEN `Fill_Status` <> "Bad" AND `Next_Status` = "Bad" THEN ((cast(`Next_EventTime` AS integer) - cast(`EventTime` AS double)) / 60) ELSE 0 END AS good_minutes , CASE WHEN Step == false THEN ((`Event_Value` + `Next_Value`) * 0.5) * good_minutes ELSE (`Event_Value` * good_minutes) END AS twa_value FROM interpolate) ,twa AS (SELECT `TagName`, `WindowEventTime` AS `EventTime`, sum(twa_value) / sum(good_minutes) AS `Value` from twa_calculations GROUP BY `TagName`, `WindowEventTime`) ,project AS (SELECT * FROM twa WHERE `EventTime` BETWEEN to_timestamp("2011-01-01T00:00:00") AND to_timestamp("2011-01-02T23:59:59")) SELECT * FROM project ORDER BY `TagName`, `EventTime` ' diff --git a/tests/sdk/python/rtdip_sdk/queries/time_series/test_interpolate.py b/tests/sdk/python/rtdip_sdk/queries/time_series/test_interpolate.py index 87a1ed068..ee1ffb4bd 100644 --- a/tests/sdk/python/rtdip_sdk/queries/time_series/test_interpolate.py +++ b/tests/sdk/python/rtdip_sdk/queries/time_series/test_interpolate.py @@ -118,7 +118,7 @@ def test_interpolate_offset_limit(mocker: MockerFixture): _test_base_succeed( mocker, TEST_PARAMETERS, - INTERPOLATE_MOCKED_QUERY + MOCKED_QUERY_OFFSET_LIMIT, + INTERPOLATE_MOCKED_QUERY + " " + MOCKED_QUERY_OFFSET_LIMIT.strip(), interpolate_get, ) diff --git a/tests/sdk/python/rtdip_sdk/queries/time_series/test_raw.py b/tests/sdk/python/rtdip_sdk/queries/time_series/test_raw.py index 2315d69dd..9c66586dd 100644 --- a/tests/sdk/python/rtdip_sdk/queries/time_series/test_raw.py +++ b/tests/sdk/python/rtdip_sdk/queries/time_series/test_raw.py @@ -56,7 +56,7 @@ def test_raw_offset_limit(mocker: MockerFixture): _test_base_succeed( mocker, MOCKED_RAW_DICT, - RAW_MOCKED_QUERY + MOCKED_QUERY_OFFSET_LIMIT, + RAW_MOCKED_QUERY + " " + MOCKED_QUERY_OFFSET_LIMIT.strip(), raw_get, ) diff --git a/tests/sdk/python/rtdip_sdk/queries/time_series/test_resample.py b/tests/sdk/python/rtdip_sdk/queries/time_series/test_resample.py index 35bd18d54..bcdd83fab 100644 --- a/tests/sdk/python/rtdip_sdk/queries/time_series/test_resample.py +++ b/tests/sdk/python/rtdip_sdk/queries/time_series/test_resample.py @@ -97,7 +97,7 @@ def test_resample_offset_limit(mocker: MockerFixture): _test_base_succeed( mocker, MOCKED_RESAMPLED_PARAMETER_DICT, - (RESAMPLE_MOCKED_QUERY + MOCKED_QUERY_OFFSET_LIMIT), + (RESAMPLE_MOCKED_QUERY + " " + MOCKED_QUERY_OFFSET_LIMIT.strip()), resample_get, )