From 27729688dc012b32ef5aebc4d6ea03968ab8b572 Mon Sep 17 00:00:00 2001 From: jamiedemaria Date: Thu, 24 Oct 2024 17:34:07 -0400 Subject: [PATCH] storage reads from backfill tags table and backfill job_name column if they are populated (#25497) ## Summary & Motivation See https://github.com/dagster-io/dagster/pull/25460 for additional context updates backfill queries to use the job_name column and backfill tags table if the migrations have run ## How I Tested These Changes ## Changelog Added a new column to the BulkActions table and a new BackfillTags table to improve performance when filtering Backfills. To take advantage of these performance improvements, run `dagster instance migrate`. This migration involves a schema migration to add the new column and table, and a data migration to populate the new column for historical backfills. --- .../_core/storage/runs/sql_run_storage.py | 100 ++++++++++++------ .../compat_tests/test_back_compat.py | 85 ++++++++++++++- .../compat_tests/test_back_compat.py | 85 ++++++++++++++- .../dagster_mysql_tests/test_run_storage.py | 1 + .../compat_tests/test_back_compat.py | 88 ++++++++++++++- 5 files changed, 314 insertions(+), 45 deletions(-) diff --git a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py index 72caadc27cb16..cbdd76922c3e0 100644 --- a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py +++ b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py @@ -57,6 +57,7 @@ ) from dagster._core.storage.runs.base import RunStorage from dagster._core.storage.runs.migration import ( + BACKFILL_JOB_NAME_AND_TAGS, OPTIONAL_DATA_MIGRATIONS, REQUIRED_DATA_MIGRATIONS, RUN_BACKFILL_ID, @@ -862,49 +863,76 @@ def wipe_daemon_heartbeats(self) -> None: # https://stackoverflow.com/a/54386260/324449 conn.execute(DaemonHeartbeatsTable.delete()) - def _backfills_query(self, filters: Optional[BulkActionsFilter] = None): - query = db_select([BulkActionsTable.c.body, BulkActionsTable.c.timestamp]) - if filters and filters.tags: - # Backfills do not have a corresponding tags table. However, all tags that are on a backfill are - # applied to the runs the backfill launches. So we can query for runs that match the tags and - # are also part of a backfill to find the backfills that match the tags. - - backfills_with_tags_query = db_select([RunTagsTable.c.value]).where( - RunTagsTable.c.key == BACKFILL_ID_TAG - ) - + def _add_backfill_filters_to_table( + self, table: db.Table, filters: Optional[BulkActionsFilter] + ) -> db.Table: + if filters and filters.tags and self.has_built_index(BACKFILL_JOB_NAME_AND_TAGS): for i, (key, value) in enumerate(filters.tags.items()): - run_tags_alias = db.alias(RunTagsTable, f"run_tags_filter{i}") - backfills_with_tags_query = backfills_with_tags_query.where( + backfill_tags_alias = db.alias(BackfillTagsTable, f"backfill_tags_filter{i}") + + table = table.join( + backfill_tags_alias, db.and_( - RunTagsTable.c.run_id == run_tags_alias.c.run_id, - run_tags_alias.c.key == key, - (run_tags_alias.c.value == value) + BulkActionsTable.c.key == backfill_tags_alias.c.backfill_id, + backfill_tags_alias.c.key == key, + (backfill_tags_alias.c.value == value) if isinstance(value, str) - else run_tags_alias.c.value.in_(value), + else backfill_tags_alias.c.value.in_(value), ), ) + return table + return table + + def _backfills_query(self, filters: Optional[BulkActionsFilter] = None): + query = db_select([BulkActionsTable.c.body, BulkActionsTable.c.timestamp]) + if filters and filters.tags: + if not self.has_built_index(BACKFILL_JOB_NAME_AND_TAGS): + # if the migration was run, we added the query for tags filtering in _add_backfill_filters_to_table + # BackfillTags table has not been built. However, all tags that are on a backfill are + # applied to the runs the backfill launches. So we can query for runs that match the tags and + # are also part of a backfill to find the backfills that match the tags. + + backfills_with_tags_query = db_select([RunTagsTable.c.value]).where( + RunTagsTable.c.key == BACKFILL_ID_TAG + ) + + for i, (key, value) in enumerate(filters.tags.items()): + run_tags_alias = db.alias(RunTagsTable, f"run_tags_filter{i}") + backfills_with_tags_query = backfills_with_tags_query.where( + db.and_( + RunTagsTable.c.run_id == run_tags_alias.c.run_id, + run_tags_alias.c.key == key, + (run_tags_alias.c.value == value) + if isinstance(value, str) + else run_tags_alias.c.value.in_(value), + ), + ) - query = query.where(BulkActionsTable.c.key.in_(db_subquery(backfills_with_tags_query))) + query = query.where( + BulkActionsTable.c.key.in_(db_subquery(backfills_with_tags_query)) + ) if filters and filters.job_name: - run_tags_table = RunTagsTable + if self.has_built_index(BACKFILL_JOB_NAME_AND_TAGS): + query = query.where(BulkActionsTable.c.job_name == filters.job_name) + else: + run_tags_table = RunTagsTable - runs_in_backfill_with_job_name = run_tags_table.join( - RunsTable, - db.and_( - RunTagsTable.c.run_id == RunsTable.c.run_id, - RunTagsTable.c.key == BACKFILL_ID_TAG, - RunsTable.c.pipeline_name == filters.job_name, - ), - ) + runs_in_backfill_with_job_name = run_tags_table.join( + RunsTable, + db.and_( + RunTagsTable.c.run_id == RunsTable.c.run_id, + RunTagsTable.c.key == BACKFILL_ID_TAG, + RunsTable.c.pipeline_name == filters.job_name, + ), + ) - backfills_with_job_name_query = db_select([RunTagsTable.c.value]).select_from( - runs_in_backfill_with_job_name - ) - query = query.where( - BulkActionsTable.c.key.in_(db_subquery(backfills_with_job_name_query)) - ) + backfills_with_job_name_query = db_select([RunTagsTable.c.value]).select_from( + runs_in_backfill_with_job_name + ) + query = query.where( + BulkActionsTable.c.key.in_(db_subquery(backfills_with_job_name_query)) + ) if filters and filters.statuses: query = query.where( BulkActionsTable.c.status.in_([status.value for status in filters.statuses]) @@ -966,13 +994,15 @@ def get_backfills( if status is not None: filters = BulkActionsFilter(statuses=[status]) - query = self._backfills_query(filters=filters) + table = self._add_backfill_filters_to_table(BulkActionsTable, filters) + query = self._backfills_query(filters=filters).select_from(table) query = self._add_cursor_limit_to_backfills_query(query, cursor=cursor, limit=limit) query = query.order_by(BulkActionsTable.c.id.desc()) rows = self.fetchall(query) backfill_candidates = deserialize_values((row["body"] for row in rows), PartitionBackfill) - if filters and filters.tags: + if filters and filters.tags and not self.has_built_index(BACKFILL_JOB_NAME_AND_TAGS): + # if we are still using the run tags table to get backfills by tag, we need to do an additional check. # runs can have more tags than the backfill that launched them. Since we filtered tags by # querying for runs with those tags, we need to do an additional check that the backfills # also have the requested tags diff --git a/python_modules/dagster/dagster_tests/general_tests/compat_tests/test_back_compat.py b/python_modules/dagster/dagster_tests/general_tests/compat_tests/test_back_compat.py index 96e47a16c6f17..b57de6c8167c6 100644 --- a/python_modules/dagster/dagster_tests/general_tests/compat_tests/test_back_compat.py +++ b/python_modules/dagster/dagster_tests/general_tests/compat_tests/test_back_compat.py @@ -31,7 +31,7 @@ from dagster._core.errors import DagsterInvalidInvocationError from dagster._core.events import DagsterEvent, StepMaterializationData from dagster._core.events.log import EventLogEntry -from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill +from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus, PartitionBackfill from dagster._core.execution.plan.outputs import StepOutputHandle from dagster._core.execution.plan.state import KnownExecutionState from dagster._core.instance import DagsterInstance, InstanceRef @@ -45,7 +45,7 @@ from dagster._core.storage.event_log.migration import migrate_event_log_data from dagster._core.storage.event_log.sql_event_log import SqlEventLogStorage from dagster._core.storage.migration.utils import upgrading_instance -from dagster._core.storage.runs.migration import RUN_BACKFILL_ID +from dagster._core.storage.runs.migration import BACKFILL_JOB_NAME_AND_TAGS, RUN_BACKFILL_ID from dagster._core.storage.sqlalchemy_compat import db_select from dagster._core.storage.tags import ( BACKFILL_ID_TAG, @@ -1338,6 +1338,23 @@ def test_add_backfill_tags(): backfill_timestamp=get_current_timestamp(), ) instance.add_backfill(before_migration) + # filtering pre-migration relies on filtering runs, so add a run with the expected tags + pre_migration_run = instance.run_storage.add_run( + DagsterRun( + job_name="foo", + run_id=make_new_run_id(), + tags={"before": "migration", BACKFILL_ID_TAG: before_migration.backfill_id}, + status=DagsterRunStatus.NOT_STARTED, + ) + ) + + # filtering by tags works before migration + assert ( + instance.get_backfills(filters=BulkActionsFilter(tags={"before": "migration"}))[ + 0 + ].backfill_id + == before_migration.backfill_id + ) instance.upgrade() assert "backfill_tags" in get_sqlite3_tables(db_path) @@ -1362,6 +1379,24 @@ def test_add_backfill_tags(): assert ids_to_tags.get(before_migration.backfill_id) == before_migration.tags assert ids_to_tags[after_migration.backfill_id] == after_migration.tags + # filtering by tags works after migration + assert instance.run_storage.has_built_index(BACKFILL_JOB_NAME_AND_TAGS) + # delete the run that was added pre-migration to prove that tags filtering is happening on the + # backfill_tags table + instance.delete_run(pre_migration_run.run_id) + assert ( + instance.get_backfills(filters=BulkActionsFilter(tags={"before": "migration"}))[ + 0 + ].backfill_id + == before_migration.backfill_id + ) + assert ( + instance.get_backfills(filters=BulkActionsFilter(tags={"after": "migration"}))[ + 0 + ].backfill_id + == after_migration.backfill_id + ) + # test downgrade instance._run_storage._alembic_downgrade(rev="1aca709bba64") assert get_current_alembic_version(db_path) == "1aca709bba64" @@ -1392,7 +1427,7 @@ def test_add_bulk_actions_job_name_column(): ), repository_name="the_repo", ), - partition_set_name=partition_set_snap_name_for_job_name("foo"), + partition_set_name=partition_set_snap_name_for_job_name("before_migration"), ) before_migration = PartitionBackfill( "before_job_migration", @@ -1403,12 +1438,38 @@ def test_add_bulk_actions_job_name_column(): backfill_timestamp=get_current_timestamp(), ) instance.add_backfill(before_migration) + # filtering pre-migration relies on filtering runs, so add a run with the expected job_name + pre_migration_run = instance.run_storage.add_run( + DagsterRun( + job_name=before_migration.job_name, + run_id=make_new_run_id(), + tags={BACKFILL_ID_TAG: before_migration.backfill_id}, + status=DagsterRunStatus.NOT_STARTED, + ) + ) + + # filtering by job_name works before migration + assert ( + instance.get_backfills( + filters=BulkActionsFilter(job_name=before_migration.job_name) + )[0].backfill_id + == before_migration.backfill_id + ) instance.upgrade() backfill_columns = get_sqlite3_columns(db_path, "bulk_actions") assert "job_name" in backfill_columns + partition_set_origin = RemotePartitionSetOrigin( + repository_origin=RemoteRepositoryOrigin( + code_location_origin=GrpcServerCodeLocationOrigin( + host="localhost", port=1234, location_name="test_location" + ), + repository_name="the_repo", + ), + partition_set_name=partition_set_snap_name_for_job_name("after_migration"), + ) after_migration = PartitionBackfill( "after_job_migration", partition_set_origin=partition_set_origin, @@ -1429,6 +1490,24 @@ def test_add_bulk_actions_job_name_column(): assert ids_to_job_name[before_migration.backfill_id] == before_migration.job_name assert ids_to_job_name[after_migration.backfill_id] == after_migration.job_name + # filtering by job_name works after migration + assert instance.run_storage.has_built_index(BACKFILL_JOB_NAME_AND_TAGS) + # delete the run that was added pre-migration to prove that tags filtering is happening on the + # backfill_tags table + instance.delete_run(pre_migration_run.run_id) + assert ( + instance.get_backfills( + filters=BulkActionsFilter(job_name=before_migration.job_name) + )[0].backfill_id + == before_migration.backfill_id + ) + assert ( + instance.get_backfills( + filters=BulkActionsFilter(job_name=after_migration.job_name) + )[0].backfill_id + == after_migration.backfill_id + ) + # test downgrade instance._run_storage._alembic_downgrade(rev="1aca709bba64") diff --git a/python_modules/libraries/dagster-mysql/dagster_mysql_tests/compat_tests/test_back_compat.py b/python_modules/libraries/dagster-mysql/dagster_mysql_tests/compat_tests/test_back_compat.py index d968ec71b7c4f..ba1199ec802c3 100644 --- a/python_modules/libraries/dagster-mysql/dagster_mysql_tests/compat_tests/test_back_compat.py +++ b/python_modules/libraries/dagster-mysql/dagster_mysql_tests/compat_tests/test_back_compat.py @@ -10,13 +10,13 @@ from dagster import AssetKey, AssetMaterialization, AssetObservation, Output, job, op from dagster._core.definitions.data_version import DATA_VERSION_TAG from dagster._core.errors import DagsterInvalidInvocationError -from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill +from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus, PartitionBackfill from dagster._core.instance import DagsterInstance from dagster._core.remote_representation.external_data import partition_set_snap_name_for_job_name from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus, RunsFilter from dagster._core.storage.event_log.migration import ASSET_KEY_INDEX_COLS from dagster._core.storage.migration.bigint_migration import run_bigint_migration -from dagster._core.storage.runs.migration import RUN_BACKFILL_ID +from dagster._core.storage.runs.migration import BACKFILL_JOB_NAME_AND_TAGS, RUN_BACKFILL_ID from dagster._core.storage.sqlalchemy_compat import db_select from dagster._core.storage.tags import BACKFILL_ID_TAG from dagster._core.utils import make_new_run_id @@ -616,6 +616,23 @@ def test_add_backfill_tags(conn_string): backfill_timestamp=get_current_timestamp(), ) instance.add_backfill(before_migration) + # filtering pre-migration relies on filtering runs, so add a run with the expected tags + pre_migration_run = instance.run_storage.add_run( + DagsterRun( + job_name="foo", + run_id=make_new_run_id(), + tags={"before": "migration", BACKFILL_ID_TAG: before_migration.backfill_id}, + status=DagsterRunStatus.NOT_STARTED, + ) + ) + + # filtering by tags works before migration + assert ( + instance.get_backfills(filters=BulkActionsFilter(tags={"before": "migration"}))[ + 0 + ].backfill_id + == before_migration.backfill_id + ) instance.upgrade() assert "backfill_tags" in get_tables(instance) @@ -645,6 +662,24 @@ def test_add_backfill_tags(conn_string): assert ids_to_tags.get(before_migration.backfill_id) == before_migration.tags assert ids_to_tags[after_migration.backfill_id] == after_migration.tags + # filtering by tags works after migration + assert instance.run_storage.has_built_index(BACKFILL_JOB_NAME_AND_TAGS) + # delete the run that was added pre-migration to prove that tags filtering is happening on the + # backfill_tags table + instance.delete_run(pre_migration_run.run_id) + assert ( + instance.get_backfills(filters=BulkActionsFilter(tags={"before": "migration"}))[ + 0 + ].backfill_id + == before_migration.backfill_id + ) + assert ( + instance.get_backfills(filters=BulkActionsFilter(tags={"after": "migration"}))[ + 0 + ].backfill_id + == after_migration.backfill_id + ) + def test_add_bulk_actions_job_name_column(conn_string): from dagster._core.remote_representation.origin import ( @@ -678,7 +713,7 @@ def test_add_bulk_actions_job_name_column(conn_string): ), repository_name="the_repo", ), - partition_set_name=partition_set_snap_name_for_job_name("foo"), + partition_set_name=partition_set_snap_name_for_job_name("before_migration"), ) before_migration = PartitionBackfill( "before_migration", @@ -689,11 +724,37 @@ def test_add_bulk_actions_job_name_column(conn_string): backfill_timestamp=get_current_timestamp(), ) instance.add_backfill(before_migration) + # filtering pre-migration relies on filtering runs, so add a run with the expected job_name + pre_migration_run = instance.run_storage.add_run( + DagsterRun( + job_name=before_migration.job_name, + run_id=make_new_run_id(), + tags={BACKFILL_ID_TAG: before_migration.backfill_id}, + status=DagsterRunStatus.NOT_STARTED, + ) + ) + + # filtering by job_name works before migration + assert ( + instance.get_backfills( + filters=BulkActionsFilter(job_name=before_migration.job_name) + )[0].backfill_id + == before_migration.backfill_id + ) instance.upgrade() assert "job_name" in get_columns(instance, "bulk_actions") + partition_set_origin = RemotePartitionSetOrigin( + repository_origin=RemoteRepositoryOrigin( + code_location_origin=GrpcServerCodeLocationOrigin( + host="localhost", port=1234, location_name="test_location" + ), + repository_name="the_repo", + ), + partition_set_name=partition_set_snap_name_for_job_name("after_migration"), + ) after_migration = PartitionBackfill( "after_migration", partition_set_origin=partition_set_origin, @@ -712,3 +773,21 @@ def test_add_bulk_actions_job_name_column(conn_string): ids_to_job_name = {row[0]: row[1] for row in rows} assert ids_to_job_name[before_migration.backfill_id] == before_migration.job_name assert ids_to_job_name[after_migration.backfill_id] == after_migration.job_name + + # filtering by job_name works after migration + assert instance.run_storage.has_built_index(BACKFILL_JOB_NAME_AND_TAGS) + # delete the run that was added pre-migration to prove that tags filtering is happening on the + # backfill_tags table + instance.delete_run(pre_migration_run.run_id) + assert ( + instance.get_backfills( + filters=BulkActionsFilter(job_name=before_migration.job_name) + )[0].backfill_id + == before_migration.backfill_id + ) + assert ( + instance.get_backfills( + filters=BulkActionsFilter(job_name=after_migration.job_name) + )[0].backfill_id + == after_migration.backfill_id + ) diff --git a/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_run_storage.py b/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_run_storage.py index f6e25336b95ab..7b3d8e5c079c2 100644 --- a/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_run_storage.py +++ b/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_run_storage.py @@ -13,6 +13,7 @@ class TestMySQLRunStorage(TestRunStorage): __test__ = True + # TestMySQLRunStorage::test_backfill_tags_filtering_multiple_results def supports_backfill_tags_filtering_queries(self): return True diff --git a/python_modules/libraries/dagster-postgres/dagster_postgres_tests/compat_tests/test_back_compat.py b/python_modules/libraries/dagster-postgres/dagster_postgres_tests/compat_tests/test_back_compat.py index 6fbb24fd2026f..d20da57d03761 100644 --- a/python_modules/libraries/dagster-postgres/dagster_postgres_tests/compat_tests/test_back_compat.py +++ b/python_modules/libraries/dagster-postgres/dagster_postgres_tests/compat_tests/test_back_compat.py @@ -19,13 +19,13 @@ from dagster._core.definitions.data_version import DATA_VERSION_TAG from dagster._core.errors import DagsterInvalidInvocationError from dagster._core.execution.api import execute_job -from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill +from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus, PartitionBackfill from dagster._core.instance import DagsterInstance from dagster._core.remote_representation.external_data import partition_set_snap_name_for_job_name from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus, RunsFilter from dagster._core.storage.event_log.migration import ASSET_KEY_INDEX_COLS from dagster._core.storage.migration.bigint_migration import run_bigint_migration -from dagster._core.storage.runs.migration import RUN_BACKFILL_ID +from dagster._core.storage.runs.migration import BACKFILL_JOB_NAME_AND_TAGS, RUN_BACKFILL_ID from dagster._core.storage.sqlalchemy_compat import db_select from dagster._core.storage.tags import BACKFILL_ID_TAG, PARTITION_NAME_TAG, PARTITION_SET_TAG from dagster._core.utils import make_new_run_id @@ -1025,6 +1025,8 @@ def test_add_backfill_tags(hostname, conn_string): target_fd.write(template) with DagsterInstance.from_config(tempdir) as instance: + assert "backfill_tags" not in get_tables(instance) + before_migration = PartitionBackfill( "before_tag_migration", serialized_asset_backfill_data="foo", @@ -1034,7 +1036,24 @@ def test_add_backfill_tags(hostname, conn_string): backfill_timestamp=get_current_timestamp(), ) instance.add_backfill(before_migration) - assert "backfill_tags" not in get_tables(instance) + + # filtering pre-migration relies on filtering runs, so add a run with the expected tags + pre_migration_run = instance.run_storage.add_run( + DagsterRun( + job_name="foo", + run_id=make_new_run_id(), + tags={"before": "migration", BACKFILL_ID_TAG: before_migration.backfill_id}, + status=DagsterRunStatus.NOT_STARTED, + ) + ) + + # filtering by tags works before migration + assert ( + instance.get_backfills(filters=BulkActionsFilter(tags={"before": "migration"}))[ + 0 + ].backfill_id + == before_migration.backfill_id + ) instance.upgrade() assert "backfill_tags" in get_tables(instance) @@ -1063,6 +1082,24 @@ def test_add_backfill_tags(hostname, conn_string): assert ids_to_tags.get(before_migration.backfill_id) == before_migration.tags assert ids_to_tags[after_migration.backfill_id] == after_migration.tags + # filtering by tags works after migration + assert instance.run_storage.has_built_index(BACKFILL_JOB_NAME_AND_TAGS) + # delete the run that was added pre-migration to prove that tags filtering is happening on the + # backfill_tags table + instance.delete_run(pre_migration_run.run_id) + assert ( + instance.get_backfills(filters=BulkActionsFilter(tags={"before": "migration"}))[ + 0 + ].backfill_id + == before_migration.backfill_id + ) + assert ( + instance.get_backfills(filters=BulkActionsFilter(tags={"after": "migration"}))[ + 0 + ].backfill_id + == after_migration.backfill_id + ) + def test_add_bulk_actions_job_name_column(hostname, conn_string): from dagster._core.remote_representation.origin import ( @@ -1099,7 +1136,7 @@ def test_add_bulk_actions_job_name_column(hostname, conn_string): ), repository_name="the_repo", ), - partition_set_name=partition_set_snap_name_for_job_name("foo"), + partition_set_name=partition_set_snap_name_for_job_name("before_migration"), ) before_migration = PartitionBackfill( "before_migration", @@ -1110,11 +1147,36 @@ def test_add_bulk_actions_job_name_column(hostname, conn_string): backfill_timestamp=get_current_timestamp(), ) instance.add_backfill(before_migration) + # filtering pre-migration relies on filtering runs, so add a run with the expected job_name + pre_migration_run = instance.run_storage.add_run( + DagsterRun( + job_name=before_migration.job_name, + run_id=make_new_run_id(), + tags={BACKFILL_ID_TAG: before_migration.backfill_id}, + status=DagsterRunStatus.NOT_STARTED, + ) + ) + # filtering by job_name works before migration + assert ( + instance.get_backfills( + filters=BulkActionsFilter(job_name=before_migration.job_name) + )[0].backfill_id + == before_migration.backfill_id + ) instance.upgrade() assert "job_name" in get_columns(instance, "bulk_actions") + partition_set_origin = RemotePartitionSetOrigin( + repository_origin=RemoteRepositoryOrigin( + code_location_origin=GrpcServerCodeLocationOrigin( + host="localhost", port=1234, location_name="test_location" + ), + repository_name="the_repo", + ), + partition_set_name=partition_set_snap_name_for_job_name("after_migration"), + ) after_migration = PartitionBackfill( "after_migration", partition_set_origin=partition_set_origin, @@ -1133,3 +1195,21 @@ def test_add_bulk_actions_job_name_column(hostname, conn_string): ids_to_job_name = {row[0]: row[1] for row in rows} assert ids_to_job_name[before_migration.backfill_id] == before_migration.job_name assert ids_to_job_name[after_migration.backfill_id] == after_migration.job_name + + # filtering by job_name works after migration + assert instance.run_storage.has_built_index(BACKFILL_JOB_NAME_AND_TAGS) + # delete the run that was added pre-migration to prove that tags filtering is happening on the + # backfill_tags table + instance.delete_run(pre_migration_run.run_id) + assert ( + instance.get_backfills( + filters=BulkActionsFilter(job_name=before_migration.job_name) + )[0].backfill_id + == before_migration.backfill_id + ) + assert ( + instance.get_backfills( + filters=BulkActionsFilter(job_name=after_migration.job_name) + )[0].backfill_id + == after_migration.backfill_id + )