From 1a7d3f691997bbf0baa76d7433ed3bffd6c5c86e Mon Sep 17 00:00:00 2001 From: Quinten Bruynseraede Date: Tue, 29 Oct 2024 20:58:46 +0100 Subject: [PATCH] [snowflake] Support ignoring missing tables when using `fetch_last_updated_timestamps` (#25488) ## Summary & Motivation This PR adds an optional argument `ignore_missing_tables: Optional[bool] = False` to `dagster_snowflake.fetch_last_updated_timestamps`. When true, this argument will not raise an error when a table is not found in Snowflake. Instead, it will just continue and exclude the table from the resulting freshness map. This is useful when you deploy a dagster project to multiple environments, where your development environment may not have all tables available. We use this function to generate observation events for source tables, and this allows us to generate observations for the tables that do exist, but ignore tables that don't. Linked issue: https://github.com/dagster-io/dagster/issues/24245 ## How I Tested These Changes Unit test is included. ## Changelog > `dagster_snowflake.fetch_last_updated_timestamps` now supports ignoring tables not found in Snowflake instead of raising an error --- .../dagster_snowflake/resources.py | 5 +++ .../dagster_snowflake_tests/test_resources.py | 41 +++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/python_modules/libraries/dagster-snowflake/dagster_snowflake/resources.py b/python_modules/libraries/dagster-snowflake/dagster_snowflake/resources.py index 68769eb095b59..61c13f504c193 100644 --- a/python_modules/libraries/dagster-snowflake/dagster_snowflake/resources.py +++ b/python_modules/libraries/dagster-snowflake/dagster_snowflake/resources.py @@ -728,6 +728,7 @@ def fetch_last_updated_timestamps( schema: str, tables: Sequence[str], database: Optional[str] = None, + ignore_missing_tables: Optional[bool] = False, ) -> Mapping[str, datetime]: """Fetch the last updated times of a list of tables in Snowflake. @@ -741,6 +742,8 @@ def fetch_last_updated_timestamps( tables (Sequence[str]): A list of table names to fetch the last updated time for. database (Optional[str]): The database of the table. Only required if the connection has not been set with a database. + ignore_missing_tables (Optional[bool]): If True, tables not found in Snowflake + will be excluded from the result. Returns: Mapping[str, datetime]: A dictionary of table names to their last updated time in UTC. @@ -766,6 +769,8 @@ def fetch_last_updated_timestamps( result_correct_case = {} for table_name in tables: if table_name.upper() not in result_mapping: + if ignore_missing_tables: + continue raise ValueError(f"Table {table_name} could not be found.") last_altered = result_mapping[table_name.upper()] check.invariant( diff --git a/python_modules/libraries/dagster-snowflake/dagster_snowflake_tests/test_resources.py b/python_modules/libraries/dagster-snowflake/dagster_snowflake_tests/test_resources.py index 5ecb47604cc46..965a7908af357 100644 --- a/python_modules/libraries/dagster-snowflake/dagster_snowflake_tests/test_resources.py +++ b/python_modules/libraries/dagster-snowflake/dagster_snowflake_tests/test_resources.py @@ -298,6 +298,47 @@ def test_fetch_last_updated_timestamps_empty(): ) +@pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB") +@pytest.mark.importorskip( + "snowflake.sqlalchemy", reason="sqlalchemy is not available in the test environment" +) +@pytest.mark.integration +def test_fetch_last_updated_timestamps_missing_table(): + with SnowflakeResource( + connector="sqlalchemy", + account=os.getenv("SNOWFLAKE_ACCOUNT"), + user=os.environ["SNOWFLAKE_USER"], + password=os.getenv("SNOWFLAKE_PASSWORD"), + database="TESTDB", + schema="TESTSCHEMA", + ).get_connection() as conn: + table_name = f"test_table_{str(uuid.uuid4()).replace('-', '_')}".lower() + try: + conn.cursor().execute(f"create table {table_name} (foo string)") + conn.cursor().execute(f"insert into {table_name} values ('bar')") + + with pytest.raises(ValueError): + freshness = fetch_last_updated_timestamps( + snowflake_connection=conn, + database="TESTDB", + # Second table does not exist, expects ValueError + tables=[table_name, reversed(table_name)], + schema="TESTSCHEMA", + ) + + freshness = fetch_last_updated_timestamps( + snowflake_connection=conn, + database="TESTDB", + tables=[table_name, reversed(table_name)], + schema="TESTSCHEMA", + ignore_missing_tables=True, + ) + assert table_name in freshness + assert len(freshness) == 1 + finally: + conn.cursor().execute(f"drop table if exists {table_name}") + + @pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB") @pytest.mark.integration @pytest.mark.parametrize("db_str", [None, "TESTDB"], ids=["db_from_resource", "db_from_param"])