diff --git a/python_modules/libraries/dagster-snowflake/dagster_snowflake/resources.py b/python_modules/libraries/dagster-snowflake/dagster_snowflake/resources.py index 68769eb095b59..61c13f504c193 100644 --- a/python_modules/libraries/dagster-snowflake/dagster_snowflake/resources.py +++ b/python_modules/libraries/dagster-snowflake/dagster_snowflake/resources.py @@ -728,6 +728,7 @@ def fetch_last_updated_timestamps( schema: str, tables: Sequence[str], database: Optional[str] = None, + ignore_missing_tables: Optional[bool] = False, ) -> Mapping[str, datetime]: """Fetch the last updated times of a list of tables in Snowflake. @@ -741,6 +742,8 @@ def fetch_last_updated_timestamps( tables (Sequence[str]): A list of table names to fetch the last updated time for. database (Optional[str]): The database of the table. Only required if the connection has not been set with a database. + ignore_missing_tables (Optional[bool]): If True, tables not found in Snowflake + will be excluded from the result. Returns: Mapping[str, datetime]: A dictionary of table names to their last updated time in UTC. @@ -766,6 +769,8 @@ def fetch_last_updated_timestamps( result_correct_case = {} for table_name in tables: if table_name.upper() not in result_mapping: + if ignore_missing_tables: + continue raise ValueError(f"Table {table_name} could not be found.") last_altered = result_mapping[table_name.upper()] check.invariant( diff --git a/python_modules/libraries/dagster-snowflake/dagster_snowflake_tests/test_resources.py b/python_modules/libraries/dagster-snowflake/dagster_snowflake_tests/test_resources.py index 5ecb47604cc46..965a7908af357 100644 --- a/python_modules/libraries/dagster-snowflake/dagster_snowflake_tests/test_resources.py +++ b/python_modules/libraries/dagster-snowflake/dagster_snowflake_tests/test_resources.py @@ -298,6 +298,47 @@ def test_fetch_last_updated_timestamps_empty(): ) +@pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB") +@pytest.mark.importorskip( + "snowflake.sqlalchemy", reason="sqlalchemy is not available in the test environment" +) +@pytest.mark.integration +def test_fetch_last_updated_timestamps_missing_table(): + with SnowflakeResource( + connector="sqlalchemy", + account=os.getenv("SNOWFLAKE_ACCOUNT"), + user=os.environ["SNOWFLAKE_USER"], + password=os.getenv("SNOWFLAKE_PASSWORD"), + database="TESTDB", + schema="TESTSCHEMA", + ).get_connection() as conn: + table_name = f"test_table_{str(uuid.uuid4()).replace('-', '_')}".lower() + try: + conn.cursor().execute(f"create table {table_name} (foo string)") + conn.cursor().execute(f"insert into {table_name} values ('bar')") + + with pytest.raises(ValueError): + freshness = fetch_last_updated_timestamps( + snowflake_connection=conn, + database="TESTDB", + # Second table does not exist, expects ValueError + tables=[table_name, reversed(table_name)], + schema="TESTSCHEMA", + ) + + freshness = fetch_last_updated_timestamps( + snowflake_connection=conn, + database="TESTDB", + tables=[table_name, reversed(table_name)], + schema="TESTSCHEMA", + ignore_missing_tables=True, + ) + assert table_name in freshness + assert len(freshness) == 1 + finally: + conn.cursor().execute(f"drop table if exists {table_name}") + + @pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB") @pytest.mark.integration @pytest.mark.parametrize("db_str", [None, "TESTDB"], ids=["db_from_resource", "db_from_param"])