Skip to content

Commit

Permalink
endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Oct 9, 2024
1 parent ef23e6d commit b5ac4d7
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ def wipe(self) -> None:
def delete_run(self, run_id: str) -> None:
return self._storage.run_storage.delete_run(run_id)

def delete_runs(self, run_ids: Sequence[str]) -> None:
return self._storage.run_storage.delete_runs(run_ids)

def migrate(self, print_fn: Optional[PrintFn] = None, force_rebuild_all: bool = False) -> None:
return self._storage.run_storage.migrate(print_fn, force_rebuild_all)

Expand Down
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/_core/storage/runs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ def wipe(self) -> None:
def delete_run(self, run_id: str) -> None:
"""Remove a run from storage."""

@abstractmethod
def delete_runs(self, run_ids: Sequence[str]) -> None:
"""Remove a list of runs from storage."""

@property
def supports_bucket_queries(self) -> bool:
return False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,24 @@ def test_delete(self, storage):
storage.delete_run(run_id)
assert list(storage.get_runs()) == []

def test_delete_multiple(self, storage):
if not self.can_delete_runs():
pytest.skip("storage cannot delete runs")

assert storage
run_ids = []
for _ in range(3):
run_id = make_new_run_id()
storage.add_run(TestRunStorage.build_run(run_id=run_id, job_name="some_pipeline"))
run_ids.append(run_id)

storage.add_run(
TestRunStorage.build_run(run_id=make_new_run_id(), job_name="some_pipeline")
)
assert len(storage.get_runs()) == 4
storage.delete_runs(run_ids)
assert len(storage.get_runs()) == 1

def test_delete_with_tags(self, storage: RunStorage):
if not self.can_delete_runs():
pytest.skip("storage cannot delete runs")
Expand All @@ -1051,6 +1069,34 @@ def test_delete_with_tags(self, storage: RunStorage):
assert list(storage.get_runs()) == []
assert run_id not in [key for key, value in storage.get_run_tags(tag_keys=[run_id])]

def test_delete_multiple_with_tags(self, storage: RunStorage):
if not self.can_delete_runs():
pytest.skip("storage cannot delete runs")

assert storage
run_ids = []
for _ in range(3):
run_id = make_new_run_id()
storage.add_run(
TestRunStorage.build_run(
run_id=run_id,
job_name="some_pipeline",
tags={run_id: run_id},
)
)
run_ids.append(run_id)

storage.add_run(
TestRunStorage.build_run(
run_id=make_new_run_id(),
job_name="some_pipeline",
tags={"not_deleted": "true"},
)
)
assert len(storage.get_runs()) == 4
storage.delete_runs(run_ids)
assert len(storage.get_runs()) == 1

def test_wipe_tags(self, storage: RunStorage):
if not self.can_delete_runs():
pytest.skip("storage cannot delete")
Expand Down

0 comments on commit b5ac4d7

Please sign in to comment.