Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(XCom): /xcom/list got exception when applying filter on the value column #45679

Draft
wants to merge 1 commit into
base: v2-10-test
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions airflow/www/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,73 @@ class UtcAwareFilterConverter(fab_sqlafilters.SQLAFilterConverter):
"""Retrieve conversion tables for UTC-Aware filters."""


class XComFilterStartsWith(fab_sqlafilters.FilterStartsWith):
"""Starts With filter for XCom values."""

def apply(self, query, value):
query, field = get_field_setup_query(query, self.model, self.column_name)
return query.filter(field.cast(types.String).ilike(value + "%"))


class XComFilterEndsWith(fab_sqlafilters.FilterEndsWith):
"""Ends With filter for XCom values."""

def apply(self, query, value):
query, field = get_field_setup_query(query, self.model, self.column_name)
return query.filter(field.cast(types.String).ilike("%" + value))


class XComFilterEqual(fab_sqlafilters.FilterEqual):
"""Equality filter for XCom values."""

def apply(self, query, value):
query, field = get_field_setup_query(query, self.model, self.column_name)
value = set_value_to_type(self.datamodel, self.column_name, value)
return query.filter(field.cast(types.String) == value)


class XComFilterContains(fab_sqlafilters.FilterContains):
"""Not Equal To filter for XCom values."""

def apply(self, query, value):
query, field = get_field_setup_query(query, self.model, self.column_name)
return query.filter(field.cast(types.String).ilike("%" + value + "%"))


class XComFilterNotStartsWith(fab_sqlafilters.FilterNotStartsWith):
"""Not Starts With filter for XCom values."""

def apply(self, query, value):
query, field = get_field_setup_query(query, self.model, self.column_name)
return query.filter(~field.cast(types.String).ilike(value + "%"))


class XComFilterNotEndsWith(fab_sqlafilters.FilterNotEndsWith):
"""Not Starts With filter for XCom values."""

def apply(self, query, value):
query, field = get_field_setup_query(query, self.model, self.column_name)
return query.filter(~field.cast(types.String).ilike(value + "%"))


class XComFilterNotContains(fab_sqlafilters.FilterNotContains):
"""Not Starts With filter for XCom values."""

def apply(self, query, value):
query, field = get_field_setup_query(query, self.model, self.column_name)
return query.filter(~field.cast(types.String).ilike("%" + value + "%"))


class XComFilterNotEqual(fab_sqlafilters.FilterNotEqual):
"""Not Starts With filter for XCom values."""

def apply(self, query, value):
query, field = get_field_setup_query(query, self.model, self.column_name)
value = set_value_to_type(self.datamodel, self.column_name, value)

return query.filter(field.cast(types.String) != value)


class AirflowFilterConverter(fab_sqlafilters.SQLAFilterConverter):
"""Retrieve conversion tables for Airflow-specific filters."""

Expand All @@ -800,6 +867,19 @@ class AirflowFilterConverter(fab_sqlafilters.SQLAFilterConverter):
"is_extendedjson",
[],
),
(
"is_xcom_value",
[
XComFilterStartsWith,
XComFilterEndsWith,
XComFilterEqual,
XComFilterContains,
XComFilterNotStartsWith,
XComFilterNotEndsWith,
XComFilterNotContains,
XComFilterNotEqual,
],
),
*fab_sqlafilters.SQLAFilterConverter.conversion_table,
)

Expand Down Expand Up @@ -864,6 +944,10 @@ def is_extendedjson(self, col_name):
)
return False

def is_xcom_value(self, col_name):
"""Check if it is col_name is value of xcom table."""
return col_name == "value" and self.obj.__tablename__ == "xcom"

def get_col_default(self, col_name: str) -> Any:
if col_name not in self.list_columns:
# Handle AssociationProxy etc, or anything that isn't a "real" column
Expand Down