Skip to content

Commit

Permalink
Merge pull request #26 from jaceksan/working
Browse files Browse the repository at this point in the history
NAS-2637: FDW - push down some filters

Reviewed-by: https://github.com/pcerny
  • Loading branch information
gdgate authored Dec 10, 2021
2 parents ab49dcd + d337e4d commit 1cfec99
Show file tree
Hide file tree
Showing 20 changed files with 281 additions and 68 deletions.
29 changes: 29 additions & 0 deletions gooddata-fdw/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,32 @@ case insensitive.

Note: If you do not specify the required options, the CREATE command will fail. If you specify wrong entity IDs,
the failures will happen at SELECT time.

## Push down of filters

When querying foreign tables, you can add WHERE clause filtering the result.
Due to performance reasons, it makes sense to push such filters down to the GD.CN, so not all data must be collected.

We are able to push only some filters down to the GD.CN:
- Simple attribute(label) filters
- Example: `WHERE region IN ('East', 'West')`
- Simple date filters
- Only DAY granularity is supported
- (NOT) IN operator is `not` supported
- Example: `WHERE my_date BETWEEN '2021-01-01 AND 2021-02-01`

If you use OR between conditions, it is not pushed down.
Push down is possible in case of custom tables and `compute` table, not in case of foreign tables imported from `insights`.

## Known limitations

It is not possible to reference a column in WHERE clause, which is not used in SELECT section.
Example:
```sql92
SELECT label1, metric FROM insight WHERE label2 = 'a';
SELECT label1, metric FROM compute WHERE label2 = 'a'
```

While it is obvious in case of `insight` (it does not contain the column at all), in case of `compute` we would like to support it,
but we are not allowed due to lack of functionality in Multicorn -
the filter is always applied on final result set and if it does not contain the column, it does not work.
9 changes: 9 additions & 0 deletions gooddata-fdw/gooddata_fdw/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
ForeignDataWrapper = multicorn.ForeignDataWrapper
TableDefinition = multicorn.TableDefinition
ColumnDefinition = multicorn.ColumnDefinition
Qual = multicorn.Qual
log_to_postgres = utils.log_to_postgres
except ImportError as e:
# determine if running as part of test suite
Expand Down Expand Up @@ -78,3 +79,11 @@ def __init__(self, column_name, type_name, options):
self.options = options

ColumnDefinition = ColumnDefinitionStub

class QualStub:
def __init__(self, field_name, operator, value):
self.field_name = field_name
self.operator = operator
self.value = value

Qual = QualStub
136 changes: 126 additions & 10 deletions gooddata-fdw/gooddata_fdw/fdw.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,27 @@
# (C) 2021 GoodData Corporation
from __future__ import annotations

import datetime
import json
import os
import re
from operator import itemgetter
from typing import Union

import gooddata_sdk as sdk
from gooddata_fdw.environment import ColumnDefinition, ForeignDataWrapper, TableDefinition
from gooddata_fdw.environment import ColumnDefinition, ForeignDataWrapper, Qual, TableDefinition
from gooddata_fdw.logging import _log_debug, _log_error, _log_info, _log_warn
from gooddata_fdw.naming import DefaultCatalogNamingStrategy, DefaultInsightColumnNaming, DefaultInsightTableNaming
from gooddata_sdk.catalog import Catalog
from gooddata_sdk.compute_model import ObjId
from gooddata_sdk.compute_model import (
AbsoluteDateFilter,
Attribute,
Filter,
NegativeAttributeFilter,
ObjId,
PositiveAttributeFilter,
SimpleMetric,
)
from gooddata_sdk.insight import InsightMetric

_USER_AGENT = "gooddata-fdw/0.1"
Expand All @@ -22,8 +34,12 @@
METRIC_DIGITS_BEFORE_DEC_POINT_DEFAULT = "18"
METRIC_DIGITS_AFTER_DEC_POINT_DEFAULT = "2"

# Once AbsoluteDateFilter supports empty from/to, remove this workaround
MIN_DATE = "0001-01-01"
MAX_DATE = "2999-01-01"


def _col_as_computable(col: ColumnDefinition):
def _col_as_computable(col: ColumnDefinition) -> Union[Attribute, SimpleMetric]:
item_type, item_id = col.options["id"].split("/")

# since all cols are from the compute table, the uniqueness of local_id is ensured...
Expand Down Expand Up @@ -223,30 +239,130 @@ def _sanitize_timestamp(self, value):
def get_computable_for_col_name(self, column_name):
return _col_as_computable(self._columns[column_name])

def _execute_compute(self, quals, columns, sortKeys=None):
@staticmethod
def _date_to_str(date: datetime.date) -> str:
return date.strftime("%Y-%m-%d")

def _get_date_filter(self, operator: str, value: datetime.date, label: ObjId):
date_from = MIN_DATE
date_to = MAX_DATE
add_filter = True
# AbsoluteDateFilter supports only day granularity
# date_to must equal to qual.value + 1 day, if qual.value day is to be included
if operator == ">=":
date_from = self._date_to_str(value)
elif operator == "<=":
date_to = self._date_to_str(value + datetime.timedelta(days=1))
elif operator == ">":
date_from = self._date_to_str(value + datetime.timedelta(days=1))
elif operator == "<":
date_to = self._date_to_str(value)
elif operator == "=":
date_from = self._date_to_str(value)
date_to = self._date_to_str(value + datetime.timedelta(days=1))
else:
add_filter = False

if add_filter:
date_filter = AbsoluteDateFilter(label, date_from, date_to)
_log_debug(f"extract_filters_from_quals: date_filter={date_filter.__dict__}")
return date_filter
else:
return None

def qual_to_date_filter(self, filters: list[Filter], filter_entity: Attribute, qual: Qual):
_log_debug(f"extract_filters_from_quals: filter_column={filter_entity} is date attribute")
# Hack - Absolute date filter requires <date_dataset>.day label, but user can limit e.g. month granularity
re_day = re.compile(r"(.*)\.[^.]+$")
label = ObjId(re_day.sub(r"\1", filter_entity.label.id), "dataset")
if isinstance(qual.operator, tuple):
# Can't be implemented by multiple filters, because GD.CN does not support OR between filters
_log_debug("extract_filters_from_quals: IN (date1, date2, ..) is not supported")
else:
date_filter = self._get_date_filter(qual.operator, qual.value, label)
if date_filter:
filters.append(date_filter)

@staticmethod
def qual_to_attribute_filter(filters: list[Filter], filter_entity: Attribute, qual: Qual):
_log_debug(f"extract_filters_from_quals: filter_column={filter_entity} is normal attribute")
if isinstance(qual.operator, tuple):
values = qual.value
positive = qual.operator[1]
else:
values = [qual.value]
positive = qual.operator == "="
_log_debug(f"extract_filters_from_quals: values={values} positive={positive}")
if positive:
filters.append(PositiveAttributeFilter(filter_entity, values))
else:
filters.append(NegativeAttributeFilter(filter_entity, values))

@staticmethod
def _is_qual_date(qual: Qual):
return isinstance(qual.value, datetime.date) or (
isinstance(qual.value, list) and isinstance(qual.value[0], datetime.date)
)

def extract_filters_from_quals(self, quals: list[Qual]) -> list[Filter]:
"""
Convert quals to Attribute filters.
Now only simple attribute filters are supported.
:param quals: multicorn quals representing filters in SQL WHERE clause
:return: Attribute filters
"""
filters = []
for qual in quals:
_log_info(
f"extract_filters_from_quals: field_name={qual.field_name} operator={qual.operator} value={qual.value}"
)
filter_entity = self.get_computable_for_col_name(qual.field_name)
if filter_entity:
if isinstance(filter_entity, Attribute):
_log_debug(f"extract_filters_from_quals: filter_entity={filter_entity} is attribute")
if self._is_qual_date(qual):
self.qual_to_date_filter(filters, filter_entity, qual)
else:
self.qual_to_attribute_filter(filters, filter_entity, qual)
else:
_log_info(
f"extract_filters_from_quals: field_name={qual.field_name} is not attribute, "
+ f"but {type(filter_entity)}"
)
else:
_log_info(
f"extract_filters_from_quals: field_name={qual.field_name} not found in report columns, "
+ "cannot push it down"
)
return filters

def _execute_compute(self, quals: list[Qual], columns, sortKeys=None):
"""
Computes data for the 'compute' pseudo-table. The 'compute' table is special. It does not behave as other
relational tables: the input columns determine what data will be calculated and the cardinality of the result
fully depends on the input columns.
"""
# TODO: pushdown some of the filters that are included in quals
items = [self.get_computable_for_col_name(col_name) for col_name in columns]
table = self._sdk.tables.for_items(self._workspace, items)
# TODO: pushdown more filters that are included in quals
filters = self.extract_filters_from_quals(quals)
table = self._sdk.tables.for_items(self._workspace, items, filters)

for row in table.read_all():
sanitized_row = {k: self._sanitize_value(k, v) for k, v in row.items()}
yield sanitized_row

def _execute_custom_report(self, quals, columns, sortKeys=None):
def _execute_custom_report(self, quals: list[Qual], columns, sortKeys=None):
"""
Computes data for manually created table that maps to particular workspace and its columns map to label, fact or
metric in that workspace. The mapping conventions are same as for the 'compute' pseudo-table. Compared to the
pseudo-table though, the custom report execution always computes data for all columns - thus appears like
any other relational table.
"""
# TODO: pushdown some of the filters that are included in quals
items = [_col_as_computable(col) for col in self._columns.values()]
table = self._sdk.tables.for_items(self._workspace, items)
# TODO: pushdown more filters that are included in quals
filters = self.extract_filters_from_quals(quals)
table = self._sdk.tables.for_items(self._workspace, items, filters)

# TODO: it is likely that this has to change to support DATE and TIMESTAMP. have mapping that need to be
# timestamp/date, instead of returning generator, iterate rows, convert to dates and yield the converted row
Expand All @@ -255,7 +371,7 @@ def _execute_custom_report(self, quals, columns, sortKeys=None):
return table.read_all()

def execute(self, quals, columns, sortkeys=None):
_log_debug(f"query in fdw with options {self._options}; columns {type(columns)}")
_log_debug(f"query in fdw with options {self._options}; columns {columns}; quals={quals}")

if self._insight:
return self._execute_insight(quals, columns, sortkeys)
Expand Down
5 changes: 5 additions & 0 deletions gooddata-fdw/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,10 @@ def test_compute_table_columns():
type_name="VARCHAR(255)",
options=dict(id="label/car.car_model"),
)
columns["datetime"] = ColumnDefinition(
column_name="datetime",
type_name="DATE",
options=dict(id="label/datetime.day"),
)

return columns
6 changes: 3 additions & 3 deletions gooddata-fdw/tests/execute/test_execute_compute_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
def test_execute_compute_table_all_columns(fdw_options_for_compute_table, test_compute_table_columns):
fdw = GoodDataForeignDataWrapper(fdw_options_for_compute_table, test_compute_table_columns)

results = list(row for row in fdw.execute(None, ["coverage_lifetime", "claim_amount", "car_make", "car_model"]))
results = list(row for row in fdw.execute([], ["coverage_lifetime", "claim_amount", "car_make", "car_model"]))

# this is cardinality when selecting on finest granularity (all labels in compute table)
assert len(results) == 101
Expand All @@ -31,7 +31,7 @@ def test_execute_compute_table_all_columns(fdw_options_for_compute_table, test_c
def test_execute_compute_table_metrics_only(fdw_options_for_compute_table, test_compute_table_columns):
fdw = GoodDataForeignDataWrapper(fdw_options_for_compute_table, test_compute_table_columns)

results = list(row for row in fdw.execute(None, ["coverage_lifetime", "claim_amount"]))
results = list(row for row in fdw.execute([], ["coverage_lifetime", "claim_amount"]))

# selecting just metrics means no granularity and full aggregation of the metric values
assert len(results) == 1
Expand All @@ -45,7 +45,7 @@ def test_execute_compute_table_metrics_only(fdw_options_for_compute_table, test_
def test_execute_compute_table_with_reduced_granularity(fdw_options_for_compute_table, test_compute_table_columns):
fdw = GoodDataForeignDataWrapper(fdw_options_for_compute_table, test_compute_table_columns)

results = list(row for row in fdw.execute(None, ["car_make", "coverage_lifetime", "claim_amount"]))
results = list(row for row in fdw.execute([], ["car_make", "coverage_lifetime", "claim_amount"]))

# selecting on reduced granularity (1 label instead of both) means the metric values are aggregated for
# that one label only - cardinality differs again
Expand Down
4 changes: 2 additions & 2 deletions gooddata-fdw/tests/execute/test_execute_insight.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
def test_execute_insight_all_columns(fdw_options_for_insight, test_insight_columns):
fdw = GoodDataForeignDataWrapper(fdw_options_for_insight, test_insight_columns)

results = list(row for row in fdw.execute(None, ["car_car_make", "customer_customer_age_group", "premium_revenue"]))
results = list(row for row in fdw.execute([], ["car_car_make", "customer_customer_age_group", "premium_revenue"]))

assert len(results) == 146
first_row = results[0]
Expand All @@ -29,7 +29,7 @@ def test_execute_insight_all_columns(fdw_options_for_insight, test_insight_colum
def test_execute_insight_some_columns(fdw_options_for_insight, test_insight_columns):
fdw = GoodDataForeignDataWrapper(fdw_options_for_insight, test_insight_columns)

results = list(row for row in fdw.execute(None, ["premium_revenue"]))
results = list(row for row in fdw.execute([], ["premium_revenue"]))

# selecting only some cols behaves like in normal table - the cardinality is same, the result rows
# contain just the selected cols
Expand Down
52 changes: 52 additions & 0 deletions gooddata-fdw/tests/unit_test/test_quals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# (C) 2021 GoodData Corporation

import datetime

import pytest

from gooddata_fdw.environment import Qual
from gooddata_fdw.fdw import MAX_DATE, MIN_DATE, GoodDataForeignDataWrapper
from gooddata_sdk.compute_model import AbsoluteDateFilter, ObjId, PositiveAttributeFilter

start_date = datetime.date(2021, 1, 1)
end_date = datetime.date(2021, 2, 1)


test_data = [
[
[Qual("datetime", ">=", start_date), Qual("car_model", ("=", True), ["Tesla", "Škoda"])],
[
AbsoluteDateFilter(ObjId("datetime", "dataset"), "2021-01-01", MAX_DATE),
PositiveAttributeFilter("car_model", ["Tesla", "Škoda"]),
],
],
[
# This represents SQL BETWEEN operation
[Qual("datetime", ">=", start_date), Qual("datetime", "<=", end_date)],
[
AbsoluteDateFilter(ObjId("datetime", "dataset"), "2021-01-01", MAX_DATE),
AbsoluteDateFilter(ObjId("datetime", "dataset"), MIN_DATE, "2021-02-02"),
],
],
[
[Qual("datetime", "=", start_date)],
[
AbsoluteDateFilter(ObjId("datetime", "dataset"), "2021-01-01", "2021-01-02"),
],
],
[
[Qual("datetime", ">", start_date), Qual("datetime", "<", end_date)],
[
AbsoluteDateFilter(ObjId("datetime", "dataset"), "2021-01-02", MAX_DATE),
AbsoluteDateFilter(ObjId("datetime", "dataset"), MIN_DATE, "2021-02-01"),
],
],
]


@pytest.mark.parametrize("quals,expected", test_data)
def test_quals(fdw_options_for_compute_table, test_compute_table_columns, quals, expected):
fdw = GoodDataForeignDataWrapper(fdw_options_for_compute_table, test_compute_table_columns)
filters = fdw.extract_filters_from_quals(quals)

assert filters == expected
8 changes: 4 additions & 4 deletions gooddata-fdw/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ envlist = py39, py38, py37
[testenv]
deps =
-r{toxinidir}/test-requirements.txt
../gooddata-metadata-client
../gooddata-afm-client
../gooddata-scan-client
../gooddata-sdk
-e../gooddata-metadata-client
-e../gooddata-afm-client
-e../gooddata-scan-client
-e../gooddata-sdk

commands =
pytest -v --cov=gooddata_fdw tests {posargs}
6 changes: 3 additions & 3 deletions gooddata-pandas/tests/dataframe/test_indexed_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_simple_index_filtered_metrics_and_label(gdf: DataFrameFactory):
safety_scale="fact/region.region_safety_scale",
region_code="label/region.region_code",
),
filter_by=[PositiveAttributeFilter(label="reg", in_values=["Bern"])],
filter_by=[PositiveAttributeFilter(label="reg", values=["Bern"])],
)

assert len(df) == 1
Expand Down Expand Up @@ -110,7 +110,7 @@ def test_multi_index_filtered_metrics_and_label(gdf: DataFrameFactory):
claim_count="metric/claim-count",
region_code="label/region.region_code",
),
filter_by=[PositiveAttributeFilter(label="reg", in_values=["Bern"])],
filter_by=[PositiveAttributeFilter(label="reg", values=["Bern"])],
)

assert len(df) == 1
Expand All @@ -136,7 +136,7 @@ def test_multi_index_filtered_metrics_and_label_reuse(gdf: DataFrameFactory):
claim_count="metric/claim-count",
reg="label/region.region_name",
),
filter_by=[PositiveAttributeFilter(label="reg_idx", in_values=["Bern"])],
filter_by=[PositiveAttributeFilter(label="reg_idx", values=["Bern"])],
)

assert len(df) == 1
Expand Down
Loading

0 comments on commit 1cfec99

Please sign in to comment.