Skip to content

Commit

Permalink
add delete backfill to storage
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Oct 15, 2024
1 parent 58456f5 commit b69d197
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 0 deletions.
3 changes: 3 additions & 0 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3108,6 +3108,9 @@ def add_backfill(self, partition_backfill: "PartitionBackfill") -> None:
def update_backfill(self, partition_backfill: "PartitionBackfill") -> None:
self._run_storage.update_backfill(partition_backfill)

def delete_backfill(self, backfill_id: str) -> None:
self._run_storage.delete_backfill(backfill_id)

@property
def should_start_background_run_thread(self) -> bool:
"""Gate on an experimental feature to start a thread that monitors for if the run should be canceled."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ def add_backfill(self, partition_backfill: "PartitionBackfill") -> None:
def update_backfill(self, partition_backfill: "PartitionBackfill") -> None:
return self._storage.run_storage.update_backfill(partition_backfill)

def delete_backfill(self, backfill_id: str) -> None:
return self._storage.run_storage.delete_backfill(backfill_id)

def get_run_partition_data(self, runs_filter: "RunsFilter") -> Sequence["RunPartitionData"]:
return self._storage.run_storage.get_run_partition_data(runs_filter)

Expand Down
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/_core/storage/runs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,10 @@ def add_backfill(self, partition_backfill: PartitionBackfill):
def update_backfill(self, partition_backfill: PartitionBackfill):
"""Update a partition backfill in run storage."""

@abstractmethod
def delete_backfill(self, backfill_id: str) -> None:
"""Delete a backfill from run storage."""

def alembic_version(self) -> Optional[AlembicVersion]:
return None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,12 @@ def update_backfill(self, partition_backfill: PartitionBackfill) -> None:
)
)

def delete_backfill(self, backfill_id: str) -> None:
check.str_param(backfill_id, "backfill_id")
query = db.delete(BulkActionsTable).where(BulkActionsTable.c.key == backfill_id)
with self.connect() as conn:
conn.execute(query)

def get_cursor_values(self, keys: Set[str]) -> Mapping[str, str]:
check.set_param(keys, "keys", of_type=str)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1735,6 +1735,65 @@ def test_backfill_id_filtering(self, storage: RunStorage):
)
assert backfills_for_id[0].backfill_id == backfill.backfill_id

def test_delete_backfill(self, storage: RunStorage):
origin = self.fake_partition_set_origin("fake_partition_set")
backfills = storage.get_backfills()
assert len(backfills) == 0

one = PartitionBackfill(
"one",
partition_set_origin=origin,
status=BulkActionStatus.REQUESTED,
partition_names=["a", "b", "c"],
from_failure=False,
tags={},
backfill_timestamp=time.time(),
)

storage.add_backfill(one)

two = PartitionBackfill(
"two",
partition_set_origin=origin,
status=BulkActionStatus.REQUESTED,
partition_names=["a", "b", "c"],
from_failure=False,
tags={},
backfill_timestamp=time.time(),
)
storage.add_backfill(two)
for _ in range(3):
storage.add_run(
TestRunStorage.build_run(
run_id=make_new_run_id(),
job_name="some_pipeline",
status=DagsterRunStatus.SUCCESS,
tags={BACKFILL_ID_TAG: two.backfill_id},
)
)

storage.add_run(
TestRunStorage.build_run(
run_id=make_new_run_id(),
job_name="some_pipeline",
status=DagsterRunStatus.SUCCESS,
tags={},
)
)

storage.delete_backfill("one")
assert storage.get_backfill("one") is None

res = storage.get_backfill("two")
assert res
assert res.backfill_id == "two"

assert len(storage.get_runs()) == 4
storage.delete_backfill("two")
assert storage.get_backfill("two") is None
# deleting a backfill does not delete the runs that are part of the backfill
assert len(storage.get_runs()) == 4

def test_secondary_index(self, storage):
self._skip_in_memory(storage)

Expand Down

0 comments on commit b69d197

Please sign in to comment.