Skip to content

Commit

Permalink
[snowflake] Support ignoring missing tables when using `fetch_last_up…
Browse files Browse the repository at this point in the history
…dated_timestamps` (#25488)

## Summary & Motivation
This PR adds an optional argument `ignore_missing_tables: Optional[bool]
= False` to `dagster_snowflake.fetch_last_updated_timestamps`. When
true, this argument will not raise an error when a table is not found in
Snowflake. Instead, it will just continue and exclude the table from the
resulting freshness map.

This is useful when you deploy a dagster project to multiple
environments, where your development environment may not have all tables
available. We use this function to generate observation events for
source tables, and this allows us to generate observations for the
tables that do exist, but ignore tables that don't.

Linked issue: #24245

## How I Tested These Changes

Unit test is included.

## Changelog

> `dagster_snowflake.fetch_last_updated_timestamps` now supports
ignoring tables not found in Snowflake instead of raising an error
  • Loading branch information
QuintenBruynseraede authored Oct 29, 2024
1 parent 2e54e69 commit 1a7d3f6
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ def fetch_last_updated_timestamps(
schema: str,
tables: Sequence[str],
database: Optional[str] = None,
ignore_missing_tables: Optional[bool] = False,
) -> Mapping[str, datetime]:
"""Fetch the last updated times of a list of tables in Snowflake.
Expand All @@ -741,6 +742,8 @@ def fetch_last_updated_timestamps(
tables (Sequence[str]): A list of table names to fetch the last updated time for.
database (Optional[str]): The database of the table. Only required if the connection
has not been set with a database.
ignore_missing_tables (Optional[bool]): If True, tables not found in Snowflake
will be excluded from the result.
Returns:
Mapping[str, datetime]: A dictionary of table names to their last updated time in UTC.
Expand All @@ -766,6 +769,8 @@ def fetch_last_updated_timestamps(
result_correct_case = {}
for table_name in tables:
if table_name.upper() not in result_mapping:
if ignore_missing_tables:
continue
raise ValueError(f"Table {table_name} could not be found.")
last_altered = result_mapping[table_name.upper()]
check.invariant(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,47 @@ def test_fetch_last_updated_timestamps_empty():
)


@pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB")
@pytest.mark.importorskip(
"snowflake.sqlalchemy", reason="sqlalchemy is not available in the test environment"
)
@pytest.mark.integration
def test_fetch_last_updated_timestamps_missing_table():
with SnowflakeResource(
connector="sqlalchemy",
account=os.getenv("SNOWFLAKE_ACCOUNT"),
user=os.environ["SNOWFLAKE_USER"],
password=os.getenv("SNOWFLAKE_PASSWORD"),
database="TESTDB",
schema="TESTSCHEMA",
).get_connection() as conn:
table_name = f"test_table_{str(uuid.uuid4()).replace('-', '_')}".lower()
try:
conn.cursor().execute(f"create table {table_name} (foo string)")
conn.cursor().execute(f"insert into {table_name} values ('bar')")

with pytest.raises(ValueError):
freshness = fetch_last_updated_timestamps(
snowflake_connection=conn,
database="TESTDB",
# Second table does not exist, expects ValueError
tables=[table_name, reversed(table_name)],
schema="TESTSCHEMA",
)

freshness = fetch_last_updated_timestamps(
snowflake_connection=conn,
database="TESTDB",
tables=[table_name, reversed(table_name)],
schema="TESTSCHEMA",
ignore_missing_tables=True,
)
assert table_name in freshness
assert len(freshness) == 1
finally:
conn.cursor().execute(f"drop table if exists {table_name}")


@pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB")
@pytest.mark.integration
@pytest.mark.parametrize("db_str", [None, "TESTDB"], ids=["db_from_resource", "db_from_param"])
Expand Down

0 comments on commit 1a7d3f6

Please sign in to comment.