From f37dd47a7a49fcfb34d0b05ab459ab4bd66b00bd Mon Sep 17 00:00:00 2001 From: gibsondan Date: Tue, 3 Sep 2024 09:10:39 -0500 Subject: [PATCH] Refactor TestEventLogStorage in advance of making all storages able to wipe assets (#24133) Summary: Some storages can call .wipe(), others can only call .wipe_asset_key(). Also the tests that involved asset events were doing some funky stuff with generating events from an entirely separate instance that is no longer needed since the instance is a fixture and wiped after every test run. Test Plan: BK ## Summary & Motivation ## How I Tested These Changes ## Changelog [New | Bug | Docs] > Replace this message with a changelog entry, or `NOCHANGELOG` --- .../storage_tests/utils/event_log_storage.py | 2087 ++++++++--------- 1 file changed, 924 insertions(+), 1163 deletions(-) diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py index 760a7ccf2c247..baca090978157 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py @@ -471,8 +471,14 @@ def test_init_log_storage(self, storage): def test_log_storage_run_not_found(self, storage): assert storage.get_logs_for_run(make_new_run_id()) == [] - def can_wipe(self): - # Whether the storage is allowed to wipe the event log + def can_wipe_event_log(self): + # Whether the storage is allowed to wipe the entire event log + return True + + def can_query_all_asset_records(self): + return True + + def can_write_cached_status(self): return True def can_watch(self): @@ -522,7 +528,7 @@ def test_event_log_storage_store_events_and_wipe(self, test_run_id, storage: Eve assert len(storage.get_logs_for_run(test_run_id)) == 1 assert storage.get_stats_for_run(test_run_id) - if self.can_wipe(): + if self.can_wipe_event_log(): storage.wipe() assert len(storage.get_logs_for_run(test_run_id)) == 0 @@ -556,7 +562,7 @@ def test_event_log_storage_store_with_multiple_runs( assert len(storage.get_logs_for_run(run_id)) == 1 assert storage.get_stats_for_run(run_id).steps_succeeded == 1 - if self.can_wipe(): + if self.can_wipe_event_log(): storage.wipe() for run_id in runs: assert len(storage.get_logs_for_run(run_id)) == 0 @@ -958,7 +964,7 @@ def test_wipe_sql_backed_event_log(self, test_run_id, storage): assert _event_types(out_events) == _event_types(events) - if self.can_wipe(): + if self.can_wipe_event_log(): storage.wipe() assert storage.get_logs_for_run(result.run_id) == [] @@ -1180,9 +1186,11 @@ def test_correct_timezone(self, test_run_id, storage): assert int(log.timestamp) == int(stats.start_time) assert int(log.timestamp) == int(curr_time) - def test_asset_materialization(self, storage, test_run_id): + def test_asset_materialization(self, storage, instance): asset_key = AssetKey(["path", "to", "asset_one"]) + test_run_id = make_new_run_id() + @op def materialize_one(_): yield AssetMaterialization( @@ -1199,38 +1207,31 @@ def materialize_one(_): def _ops(): materialize_one() - with instance_for_test() as created_instance: - if not storage.has_instance: - storage.register_instance(created_instance) + _synthesize_events(_ops, instance=instance, run_id=test_run_id) - events_one, _ = _synthesize_events(_ops, instance=created_instance, run_id=test_run_id) + assert asset_key in set(storage.all_asset_keys()) - for event in events_one: - storage.store_event(event) - - assert asset_key in set(storage.all_asset_keys()) - - # legacy API - records = storage.get_event_records( - EventRecordsFilter( - event_type=DagsterEventType.ASSET_MATERIALIZATION, - asset_key=asset_key, - ) + # legacy API + records = storage.get_event_records( + EventRecordsFilter( + event_type=DagsterEventType.ASSET_MATERIALIZATION, + asset_key=asset_key, ) - assert len(records) == 1 - record = records[0] - assert isinstance(record, EventLogRecord) - assert record.event_log_entry.dagster_event - assert record.event_log_entry.dagster_event.asset_key == asset_key - - # new API - result = storage.fetch_materializations(asset_key, limit=100) - assert isinstance(result, EventRecordsResult) - assert len(result.records) == 1 - record = result.records[0] - assert record.event_log_entry.dagster_event - assert record.event_log_entry.dagster_event.asset_key == asset_key - assert result.cursor == EventLogCursor.from_storage_id(record.storage_id).to_string() + ) + assert len(records) == 1 + record = records[0] + assert isinstance(record, EventLogRecord) + assert record.event_log_entry.dagster_event + assert record.event_log_entry.dagster_event.asset_key == asset_key + + # new API + result = storage.fetch_materializations(asset_key, limit=100) + assert isinstance(result, EventRecordsResult) + assert len(result.records) == 1 + record = result.records[0] + assert record.event_log_entry.dagster_event + assert record.event_log_entry.dagster_event.asset_key == asset_key + assert result.cursor == EventLogCursor.from_storage_id(record.storage_id).to_string() def test_asset_materialization_range(self, storage, test_run_id): partitions_def = StaticPartitionsDefinition(["a", "b"]) @@ -1246,29 +1247,29 @@ def load_input(self, context): def foo(): return {"a": 1, "b": 2} - with instance_for_test() as instance: - if not storage.has_instance: - storage.register_instance(instance) - + with instance_for_test() as test_instance: events, _ = _synthesize_events( [foo], - instance=instance, + instance=test_instance, run_id=test_run_id, tags={ASSET_PARTITION_RANGE_START_TAG: "a", ASSET_PARTITION_RANGE_END_TAG: "b"}, ) - materializations = [ - e for e in events if e.dagster_event.event_type == "ASSET_MATERIALIZATION" - ] - storage.store_event_batch(materializations) - result = storage.fetch_materializations(foo.key, limit=100) - assert len(result.records) == 2 + materializations = [ + e for e in events if e.dagster_event.event_type == "ASSET_MATERIALIZATION" + ] + storage.store_event_batch(materializations) + + result = storage.fetch_materializations(foo.key, limit=100) + assert len(result.records) == 2 - def test_asset_materialization_fetch(self, storage, test_run_id): + def test_asset_materialization_fetch(self, storage, instance): asset_key = AssetKey(["path", "to", "asset_one"]) other_asset_key = AssetKey(["path", "to", "asset_two"]) + test_run_id = make_new_run_id() + @op def materialize(_): yield AssetMaterialization(asset_key=asset_key, metadata={"count": 1}, partition="1") @@ -1284,120 +1285,113 @@ def materialize(_): def _ops(): materialize() - with instance_for_test() as created_instance: - if not storage.has_instance: - storage.register_instance(created_instance) - - events, _ = _synthesize_events(_ops, instance=created_instance, run_id=test_run_id) - - for event in events: - storage.store_event(event) + _synthesize_events(_ops, instance=instance, run_id=test_run_id) - assert asset_key in set(storage.all_asset_keys()) + assert asset_key in set(storage.all_asset_keys()) - def _get_counts(result): - assert isinstance(result, EventRecordsResult) - return [ - record.asset_materialization.metadata.get("count").value - for record in result.records - ] + def _get_counts(result): + assert isinstance(result, EventRecordsResult) + return [ + record.asset_materialization.metadata.get("count").value + for record in result.records + ] - # results come in descending order, by default - result = storage.fetch_materializations(asset_key, limit=100) - assert _get_counts(result) == [5, 4, 3, 2, 1] + # results come in descending order, by default + result = storage.fetch_materializations(asset_key, limit=100) + assert _get_counts(result) == [5, 4, 3, 2, 1] - result = storage.fetch_materializations(asset_key, limit=3) - assert _get_counts(result) == [5, 4, 3] + result = storage.fetch_materializations(asset_key, limit=3) + assert _get_counts(result) == [5, 4, 3] - # results come in ascending order, limited - result = storage.fetch_materializations(asset_key, limit=3, ascending=True) - assert _get_counts(result) == [1, 2, 3] + # results come in ascending order, limited + result = storage.fetch_materializations(asset_key, limit=3, ascending=True) + assert _get_counts(result) == [1, 2, 3] - storage_id_3 = result.records[2].storage_id - timestamp_3 = result.records[2].timestamp - storage_id_1 = result.records[0].storage_id - timestamp_1 = result.records[0].timestamp + storage_id_3 = result.records[2].storage_id + timestamp_3 = result.records[2].timestamp + storage_id_1 = result.records[0].storage_id + timestamp_1 = result.records[0].timestamp - other_key_result = storage.fetch_materializations( - other_asset_key, limit=1, ascending=True - ) - other_key_storage_id = other_key_result.records[0].storage_id + other_key_result = storage.fetch_materializations(other_asset_key, limit=1, ascending=True) + other_key_storage_id = other_key_result.records[0].storage_id - # filter by after storage id - result = storage.fetch_materializations( - AssetRecordsFilter( - asset_key=asset_key, - after_storage_id=storage_id_1, - ), - limit=100, - ) - assert _get_counts(result) == [5, 4, 3, 2] + # filter by after storage id + result = storage.fetch_materializations( + AssetRecordsFilter( + asset_key=asset_key, + after_storage_id=storage_id_1, + ), + limit=100, + ) + assert _get_counts(result) == [5, 4, 3, 2] - # filter by before storage id - result = storage.fetch_materializations( - AssetRecordsFilter( - asset_key=asset_key, - before_storage_id=storage_id_3, - ), - limit=100, - ) - assert _get_counts(result) == [2, 1] + # filter by before storage id + result = storage.fetch_materializations( + AssetRecordsFilter( + asset_key=asset_key, + before_storage_id=storage_id_3, + ), + limit=100, + ) + assert _get_counts(result) == [2, 1] - # filter by before and after storage id - result = storage.fetch_materializations( - AssetRecordsFilter( - asset_key=asset_key, - before_storage_id=storage_id_3, - after_storage_id=storage_id_1, - ), - limit=100, - ) - assert _get_counts(result) == [2] + # filter by before and after storage id + result = storage.fetch_materializations( + AssetRecordsFilter( + asset_key=asset_key, + before_storage_id=storage_id_3, + after_storage_id=storage_id_1, + ), + limit=100, + ) + assert _get_counts(result) == [2] - # filter by timestamp - result = storage.fetch_materializations( - AssetRecordsFilter( - asset_key=asset_key, - before_timestamp=timestamp_3, - ), - limit=100, - ) - assert _get_counts(result) == [2, 1] + # filter by timestamp + result = storage.fetch_materializations( + AssetRecordsFilter( + asset_key=asset_key, + before_timestamp=timestamp_3, + ), + limit=100, + ) + assert _get_counts(result) == [2, 1] - # filter by before and after timestamp - result = storage.fetch_materializations( - AssetRecordsFilter( - asset_key=asset_key, - before_timestamp=timestamp_3, - after_timestamp=timestamp_1, - ), - limit=100, - ) - assert _get_counts(result) == [2] + # filter by before and after timestamp + result = storage.fetch_materializations( + AssetRecordsFilter( + asset_key=asset_key, + before_timestamp=timestamp_3, + after_timestamp=timestamp_1, + ), + limit=100, + ) + assert _get_counts(result) == [2] - # filter by storage ids - result = storage.fetch_materializations( - AssetRecordsFilter( - asset_key=asset_key, - storage_ids=[storage_id_1, storage_id_3, other_key_storage_id], - ), - limit=100, - ) - assert _get_counts(result) == [3, 1] + # filter by storage ids + result = storage.fetch_materializations( + AssetRecordsFilter( + asset_key=asset_key, + storage_ids=[storage_id_1, storage_id_3, other_key_storage_id], + ), + limit=100, + ) + assert _get_counts(result) == [3, 1] - # filter by partitions - result = storage.fetch_materializations( - AssetRecordsFilter( - asset_key=asset_key, - asset_partitions=["1", "3", "5"], - ), - limit=100, - ) - assert _get_counts(result) == [5, 3, 1] + # filter by partitions + result = storage.fetch_materializations( + AssetRecordsFilter( + asset_key=asset_key, + asset_partitions=["1", "3", "5"], + ), + limit=100, + ) + assert _get_counts(result) == [5, 3, 1] - def test_asset_observation_fetch(self, storage, test_run_id): + def test_asset_observation_fetch(self, storage, instance): asset_key = AssetKey(["path", "to", "asset_one"]) + test_run_id = make_new_run_id() + @op def observe(_): yield AssetObservation(asset_key=asset_key, metadata={"count": 1}, partition="1") @@ -1410,117 +1404,109 @@ def observe(_): def _ops(): observe() - with instance_for_test() as created_instance: - if not storage.has_instance: - storage.register_instance(created_instance) - - events, _ = _synthesize_events(_ops, instance=created_instance, run_id=test_run_id) - - for event in events: - storage.store_event(event) + _synthesize_events(_ops, instance=instance, run_id=test_run_id) - assert asset_key in set(storage.all_asset_keys()) + assert asset_key in set(storage.all_asset_keys()) - def _get_counts(result): - assert isinstance(result, EventRecordsResult) - return [ - record.asset_observation.metadata.get("count").value - for record in result.records - ] + def _get_counts(result): + assert isinstance(result, EventRecordsResult) + return [ + record.asset_observation.metadata.get("count").value for record in result.records + ] - # results come in descending order, by default - result = storage.fetch_observations(asset_key, limit=100) - assert _get_counts(result) == [5, 4, 3, 2, 1] + # results come in descending order, by default + result = storage.fetch_observations(asset_key, limit=100) + assert _get_counts(result) == [5, 4, 3, 2, 1] - result = storage.fetch_observations(asset_key, limit=3) - assert _get_counts(result) == [5, 4, 3] + result = storage.fetch_observations(asset_key, limit=3) + assert _get_counts(result) == [5, 4, 3] - # results come in ascending order, limited - result = storage.fetch_observations(asset_key, limit=3, ascending=True) - assert _get_counts(result) == [1, 2, 3] + # results come in ascending order, limited + result = storage.fetch_observations(asset_key, limit=3, ascending=True) + assert _get_counts(result) == [1, 2, 3] - storage_id_3 = result.records[2].storage_id - timestamp_3 = result.records[2].timestamp - storage_id_1 = result.records[0].storage_id - timestamp_1 = result.records[0].timestamp + storage_id_3 = result.records[2].storage_id + timestamp_3 = result.records[2].timestamp + storage_id_1 = result.records[0].storage_id + timestamp_1 = result.records[0].timestamp - # filter by after storage id - result = storage.fetch_observations( - AssetRecordsFilter( - asset_key=asset_key, - after_storage_id=storage_id_1, - ), - limit=100, - ) - assert _get_counts(result) == [5, 4, 3, 2] + # filter by after storage id + result = storage.fetch_observations( + AssetRecordsFilter( + asset_key=asset_key, + after_storage_id=storage_id_1, + ), + limit=100, + ) + assert _get_counts(result) == [5, 4, 3, 2] - # filter by before storage id - result = storage.fetch_observations( - AssetRecordsFilter( - asset_key=asset_key, - before_storage_id=storage_id_3, - ), - limit=100, - ) - assert _get_counts(result) == [2, 1] + # filter by before storage id + result = storage.fetch_observations( + AssetRecordsFilter( + asset_key=asset_key, + before_storage_id=storage_id_3, + ), + limit=100, + ) + assert _get_counts(result) == [2, 1] - # filter by before and after storage id - result = storage.fetch_observations( - AssetRecordsFilter( - asset_key=asset_key, - before_storage_id=storage_id_3, - after_storage_id=storage_id_1, - ), - limit=100, - ) - assert _get_counts(result) == [2] + # filter by before and after storage id + result = storage.fetch_observations( + AssetRecordsFilter( + asset_key=asset_key, + before_storage_id=storage_id_3, + after_storage_id=storage_id_1, + ), + limit=100, + ) + assert _get_counts(result) == [2] - # filter by timestamp - result = storage.fetch_observations( - AssetRecordsFilter( - asset_key=asset_key, - before_timestamp=timestamp_3, - ), - limit=100, - ) - assert _get_counts(result) == [2, 1] + # filter by timestamp + result = storage.fetch_observations( + AssetRecordsFilter( + asset_key=asset_key, + before_timestamp=timestamp_3, + ), + limit=100, + ) + assert _get_counts(result) == [2, 1] - # filter by before and after timestamp - result = storage.fetch_observations( - AssetRecordsFilter( - asset_key=asset_key, - before_timestamp=timestamp_3, - after_timestamp=timestamp_1, - ), - limit=100, - ) - assert _get_counts(result) == [2] + # filter by before and after timestamp + result = storage.fetch_observations( + AssetRecordsFilter( + asset_key=asset_key, + before_timestamp=timestamp_3, + after_timestamp=timestamp_1, + ), + limit=100, + ) + assert _get_counts(result) == [2] - # filter by storage ids - result = storage.fetch_observations( - AssetRecordsFilter( - asset_key=asset_key, - storage_ids=[storage_id_1, storage_id_3], - ), - limit=100, - ) - assert _get_counts(result) == [3, 1] + # filter by storage ids + result = storage.fetch_observations( + AssetRecordsFilter( + asset_key=asset_key, + storage_ids=[storage_id_1, storage_id_3], + ), + limit=100, + ) + assert _get_counts(result) == [3, 1] - # filter by partitions - result = storage.fetch_observations( - AssetRecordsFilter( - asset_key=asset_key, - asset_partitions=["1", "3", "5"], - ), - limit=100, - ) - assert _get_counts(result) == [5, 3, 1] + # filter by partitions + result = storage.fetch_observations( + AssetRecordsFilter( + asset_key=asset_key, + asset_partitions=["1", "3", "5"], + ), + limit=100, + ) + assert _get_counts(result) == [5, 3, 1] def test_asset_materialization_null_key_fails(self): with pytest.raises(check.CheckError): AssetMaterialization(asset_key=None) - def test_asset_events_error_parsing(self, storage): + def test_asset_events_error_parsing(self, storage, instance): if not isinstance(storage, SqlEventLogStorage): pytest.skip("This test is for SQL-backed Event Log behavior") _logs = [] @@ -1538,77 +1524,72 @@ def materialize_one(_): def _ops(): materialize_one() - with instance_for_test() as instance: - if not storage.has_instance: - storage.register_instance(instance) - events_one, _ = _synthesize_events(_ops, instance=instance) - for event in events_one: - storage.store_event(event) + _synthesize_events(_ops, instance=instance) - with ExitStack() as stack: - stack.enter_context( - mock.patch( - "dagster._core.storage.event_log.sql_event_log.logging.warning", - side_effect=mock_log, - ) + with ExitStack() as stack: + stack.enter_context( + mock.patch( + "dagster._core.storage.event_log.sql_event_log.logging.warning", + side_effect=mock_log, ) - # for generic sql-based event log storage - stack.enter_context( - mock.patch( - "dagster._core.storage.event_log.sql_event_log.deserialize_value", - return_value="not_an_event_record", - ) + ) + # for generic sql-based event log storage + stack.enter_context( + mock.patch( + "dagster._core.storage.event_log.sql_event_log.deserialize_value", + return_value="not_an_event_record", ) - # for sqlite event log storage, which overrides the record fetching implementation - stack.enter_context( - mock.patch( - "dagster._core.storage.event_log.sqlite.sqlite_event_log.deserialize_value", - return_value="not_an_event_record", - ) + ) + # for sqlite event log storage, which overrides the record fetching implementation + stack.enter_context( + mock.patch( + "dagster._core.storage.event_log.sqlite.sqlite_event_log.deserialize_value", + return_value="not_an_event_record", ) + ) - assert asset_key in set(storage.all_asset_keys()) - _records = storage.get_event_records( - EventRecordsFilter( - event_type=DagsterEventType.ASSET_MATERIALIZATION, - asset_key=asset_key, - ) + assert asset_key in set(storage.all_asset_keys()) + _records = storage.get_event_records( + EventRecordsFilter( + event_type=DagsterEventType.ASSET_MATERIALIZATION, + asset_key=asset_key, ) - assert len(_logs) == 1 - assert re.match("Could not resolve event record as EventLogEntry", _logs[0]) - - with ExitStack() as stack: - _logs = [] # reset logs - stack.enter_context( - mock.patch( - "dagster._core.storage.event_log.sql_event_log.logging.warning", - side_effect=mock_log, - ) + ) + assert len(_logs) == 1 + assert re.match("Could not resolve event record as EventLogEntry", _logs[0]) + + with ExitStack() as stack: + _logs = [] # reset logs + stack.enter_context( + mock.patch( + "dagster._core.storage.event_log.sql_event_log.logging.warning", + side_effect=mock_log, ) + ) - # for generic sql-based event log storage - stack.enter_context( - mock.patch( - "dagster._core.storage.event_log.sql_event_log.deserialize_value", - side_effect=seven.JSONDecodeError("error", "", 0), - ) + # for generic sql-based event log storage + stack.enter_context( + mock.patch( + "dagster._core.storage.event_log.sql_event_log.deserialize_value", + side_effect=seven.JSONDecodeError("error", "", 0), ) - # for sqlite event log storage, which overrides the record fetching implementation - stack.enter_context( - mock.patch( - "dagster._core.storage.event_log.sqlite.sqlite_event_log.deserialize_value", - side_effect=seven.JSONDecodeError("error", "", 0), - ) + ) + # for sqlite event log storage, which overrides the record fetching implementation + stack.enter_context( + mock.patch( + "dagster._core.storage.event_log.sqlite.sqlite_event_log.deserialize_value", + side_effect=seven.JSONDecodeError("error", "", 0), ) - assert asset_key in set(storage.all_asset_keys()) - _records = storage.get_event_records( - EventRecordsFilter( - event_type=DagsterEventType.ASSET_MATERIALIZATION, - asset_key=asset_key, - ) + ) + assert asset_key in set(storage.all_asset_keys()) + _records = storage.get_event_records( + EventRecordsFilter( + event_type=DagsterEventType.ASSET_MATERIALIZATION, + asset_key=asset_key, ) - assert len(_logs) == 1 - assert re.match("Could not parse event record id", _logs[0]) + ) + assert len(_logs) == 1 + assert re.match("Could not parse event record id", _logs[0]) def test_secondary_index_asset_keys(self, storage, instance): asset_key_one = AssetKey(["one"]) @@ -2215,7 +2196,7 @@ def _unsub(_x, _y): assert len(event_list) == len(safe_events) assert all([isinstance(event, EventLogEntry) for event in event_list]) - def test_engine_event_markers(self, storage): + def test_engine_event_markers(self, storage, instance): @op def return_one(_): return 1 @@ -2224,28 +2205,24 @@ def return_one(_): def a_job(): return_one() - with instance_for_test() as instance: - if not storage.has_instance: - storage.register_instance(instance) - - run_id = make_new_run_id() - run = instance.create_run_for_job(a_job, run_id=run_id) + run_id = make_new_run_id() + run = instance.create_run_for_job(a_job, run_id=run_id) - instance.report_engine_event( - "blah blah", - run, - EngineEventData(marker_start="FOO"), - step_key="return_one", - ) - instance.report_engine_event( - "blah blah", - run, - EngineEventData(marker_end="FOO"), - step_key="return_one", - ) - logs = storage.get_logs_for_run(run_id) - for entry in logs: - assert entry.step_key == "return_one" + instance.report_engine_event( + "blah blah", + run, + EngineEventData(marker_start="FOO"), + step_key="return_one", + ) + instance.report_engine_event( + "blah blah", + run, + EngineEventData(marker_end="FOO"), + step_key="return_one", + ) + logs = storage.get_logs_for_run(run_id) + for entry in logs: + assert entry.step_key == "return_one" def test_latest_materializations(self, storage, instance): @op @@ -2288,183 +2265,118 @@ def _fetch_events(storage): events_by_key = _fetch_events(storage) assert len(events_by_key) == 4 - # wipe 2 of the assets, make sure we respect that - if self.can_wipe(): - storage.wipe_asset(AssetKey("a")) - storage.wipe_asset(AssetKey("b")) - events_by_key = _fetch_events(storage) - assert events_by_key.get(AssetKey("a")) is None - assert events_by_key.get(AssetKey("b")) is None - - # rematerialize one of the wiped assets, one of the existing assets - events, _ = _synthesize_events(lambda: two(), run_id=run_id_2) - for event in events: - storage.store_event(event) + storage.wipe_asset(AssetKey("a")) + storage.wipe_asset(AssetKey("b")) + events_by_key = _fetch_events(storage) + assert events_by_key.get(AssetKey("a")) is None + assert events_by_key.get(AssetKey("b")) is None - events_by_key = _fetch_events(storage) - assert events_by_key.get(AssetKey("a")) is None + # rematerialize one of the wiped assets, one of the existing assets + events, _ = _synthesize_events(lambda: two(), run_id=run_id_2) + for event in events: + storage.store_event(event) - else: - events, _ = _synthesize_events(lambda: two(), run_id=run_id_2) - for event in events: - storage.store_event(event) - events_by_key = _fetch_events(storage) + events_by_key = _fetch_events(storage) + assert events_by_key.get(AssetKey("a")) is None def test_asset_keys(self, storage, instance): - with instance_for_test() as created_instance: - if not storage.has_instance: - storage.register_instance(created_instance) - - events_one, result1 = _synthesize_events( - lambda: one_asset_op(), instance=created_instance - ) - events_two, result2 = _synthesize_events( - lambda: two_asset_ops(), instance=created_instance - ) - - with create_and_delete_test_runs(instance, [result1.run_id, result2.run_id]): - for event in events_one + events_two: - storage.store_event(event) + _synthesize_events(lambda: one_asset_op(), instance=instance) + _synthesize_events(lambda: two_asset_ops(), instance=instance) - asset_keys = storage.all_asset_keys() - assert len(asset_keys) == 3 - assert set([asset_key.to_string() for asset_key in asset_keys]) == set( - ['["asset_1"]', '["asset_2"]', '["path", "to", "asset_3"]'] - ) + asset_keys = storage.all_asset_keys() + assert len(asset_keys) == 3 + assert set([asset_key.to_string() for asset_key in asset_keys]) == set( + ['["asset_1"]', '["asset_2"]', '["path", "to", "asset_3"]'] + ) def test_has_asset_key(self, storage, instance): - with instance_for_test() as created_instance: - if not storage.has_instance: - storage.register_instance(created_instance) + _synthesize_events(lambda: one_asset_op(), instance=instance) + _synthesize_events(lambda: two_asset_ops(), instance=instance) - events_one, result_1 = _synthesize_events( - lambda: one_asset_op(), instance=created_instance - ) - events_two, result_2 = _synthesize_events( - lambda: two_asset_ops(), instance=created_instance - ) - - with create_and_delete_test_runs(instance, [result_1.run_id, result_2.run_id]): - for event in events_one + events_two: - storage.store_event(event) - - assert storage.has_asset_key(AssetKey(["path", "to", "asset_3"])) - assert not storage.has_asset_key(AssetKey(["path", "to", "bogus", "asset"])) + assert storage.has_asset_key(AssetKey(["path", "to", "asset_3"])) + assert not storage.has_asset_key(AssetKey(["path", "to", "bogus", "asset"])) - def test_asset_normalization(self, storage, test_run_id): - with instance_for_test() as instance: - if not storage.has_instance: - storage.register_instance(instance) + def test_asset_normalization(self, storage, instance): + test_run_id = make_new_run_id() - @op - def op_normalization(_): - yield AssetMaterialization(asset_key="path/to-asset_4") - yield Output(1) + @op + def op_normalization(_): + yield AssetMaterialization(asset_key="path/to-asset_4") + yield Output(1) - events, _ = _synthesize_events( - lambda: op_normalization(), instance=instance, run_id=test_run_id - ) - for event in events: - storage.store_event(event) + events, _ = _synthesize_events( + lambda: op_normalization(), instance=instance, run_id=test_run_id + ) + for event in events: + storage.store_event(event) - asset_keys = storage.all_asset_keys() - assert len(asset_keys) == 1 - asset_key = asset_keys[0] - assert asset_key.to_string() == '["path", "to", "asset_4"]' - assert asset_key.path == ["path", "to", "asset_4"] + asset_keys = storage.all_asset_keys() + assert len(asset_keys) == 1 + asset_key = asset_keys[0] + assert asset_key.to_string() == '["path", "to", "asset_4"]' + assert asset_key.path == ["path", "to", "asset_4"] def test_asset_wipe(self, storage, instance): - with instance_for_test() as created_instance: - if not storage.has_instance: - storage.register_instance(created_instance) - - one_run_id = make_new_run_id() - two_run_id = make_new_run_id() - events_one, _ = _synthesize_events( - lambda: one_asset_op(), run_id=one_run_id, instance=created_instance - ) - events_two, _ = _synthesize_events( - lambda: two_asset_ops(), run_id=two_run_id, instance=created_instance - ) - - with create_and_delete_test_runs(instance, [one_run_id, two_run_id]): - for event in events_one + events_two: - storage.store_event(event) - - asset_keys = storage.all_asset_keys() - assert len(asset_keys) == 3 - assert storage.has_asset_key(AssetKey("asset_1")) - - log_count = len(storage.get_logs_for_run(one_run_id)) - if self.can_wipe(): - for asset_key in asset_keys: - storage.wipe_asset(asset_key) - - asset_keys = storage.all_asset_keys() - assert len(asset_keys) == 0 - assert not storage.has_asset_key(AssetKey("asset_1")) - assert log_count == len(storage.get_logs_for_run(one_run_id)) - - one_run_id = make_new_run_id() - events_one, _ = _synthesize_events( - lambda: one_asset_op(), - run_id=one_run_id, - instance=created_instance, - ) - with create_and_delete_test_runs(instance, [one_run_id]): - for event in events_one: - storage.store_event(event) - - asset_keys = storage.all_asset_keys() - assert len(asset_keys) == 1 - assert storage.has_asset_key(AssetKey("asset_1")) + one_run_id = make_new_run_id() + two_run_id = make_new_run_id() + _synthesize_events(lambda: one_asset_op(), run_id=one_run_id, instance=instance) + _synthesize_events(lambda: two_asset_ops(), run_id=two_run_id, instance=instance) + + asset_keys = storage.all_asset_keys() + assert len(asset_keys) == 3 + assert storage.has_asset_key(AssetKey("asset_1")) + + log_count = len(storage.get_logs_for_run(one_run_id)) + for asset_key in asset_keys: + storage.wipe_asset(asset_key) + + asset_keys = storage.all_asset_keys() + assert len(asset_keys) == 0 + assert not storage.has_asset_key(AssetKey("asset_1")) + assert log_count == len(storage.get_logs_for_run(one_run_id)) + + one_run_id = make_new_run_id() + _synthesize_events( + lambda: one_asset_op(), + run_id=one_run_id, + instance=instance, + ) + asset_keys = storage.all_asset_keys() + assert len(asset_keys) == 1 + assert storage.has_asset_key(AssetKey("asset_1")) def test_asset_secondary_index(self, storage, instance): - with instance_for_test() as created_instance: - if not storage.has_instance: - storage.register_instance(created_instance) - - events_one, result = _synthesize_events( - lambda: one_asset_op(), instance=created_instance - ) - - with create_and_delete_test_runs(instance, [result.run_id]): - for event in events_one: - storage.store_event(event) - - asset_keys = storage.all_asset_keys() - assert len(asset_keys) == 1 - migrate_asset_key_data(storage) - - two_first_run_id = make_new_run_id() - two_second_run_id = make_new_run_id() - events_two, _ = _synthesize_events( - lambda: two_asset_ops(), - run_id=two_first_run_id, - instance=created_instance, - ) - events_two_two, _ = _synthesize_events( - lambda: two_asset_ops(), - run_id=two_second_run_id, - instance=created_instance, - ) - - with create_and_delete_test_runs(instance, [two_first_run_id, two_second_run_id]): - for event in events_two + events_two_two: - storage.store_event(event) + _synthesize_events(lambda: one_asset_op(), instance=instance) + + asset_keys = storage.all_asset_keys() + assert len(asset_keys) == 1 + migrate_asset_key_data(storage) + + two_first_run_id = make_new_run_id() + two_second_run_id = make_new_run_id() + _synthesize_events( + lambda: two_asset_ops(), + run_id=two_first_run_id, + instance=instance, + ) + _synthesize_events( + lambda: two_asset_ops(), + run_id=two_second_run_id, + instance=instance, + ) - asset_keys = storage.all_asset_keys() - assert len(asset_keys) == 3 + asset_keys = storage.all_asset_keys() + assert len(asset_keys) == 3 - storage.delete_events(two_first_run_id) - asset_keys = storage.all_asset_keys() - assert len(asset_keys) == 3 + storage.delete_events(two_first_run_id) + asset_keys = storage.all_asset_keys() + assert len(asset_keys) == 3 - storage.delete_events(two_second_run_id) - asset_keys = storage.all_asset_keys() - # we now no longer keep the asset catalog keys in sync with the presence of - # asset events in the event log - # assert len(asset_keys) == 1 + storage.delete_events(two_second_run_id) + asset_keys = storage.all_asset_keys() + # we now no longer keep the asset catalog keys in sync with the presence of + # asset events in the event log + # assert len(asset_keys) == 1 def test_asset_partition_query(self, storage, instance): @op(config_schema={"partition": Field(str, is_required=False)}) @@ -2475,45 +2387,40 @@ def op_partitioned(context): ) yield Output(1) - with instance_for_test() as created_instance: - if not storage.has_instance: - storage.register_instance(created_instance) + get_partitioned_config = lambda partition: { + "ops": {"op_partitioned": {"config": {"partition": partition}}} + } - get_partitioned_config = lambda partition: { - "ops": {"op_partitioned": {"config": {"partition": partition}}} - } + partitions = ["a", "a", "b", "c"] + run_ids = [make_new_run_id() for _ in partitions] + for partition, run_id in zip([f"partition_{x}" for x in partitions], run_ids): + _synthesize_events( + lambda: op_partitioned(), + instance=instance, + run_config=get_partitioned_config(partition), + run_id=run_id, + ) - partitions = ["a", "a", "b", "c"] - run_ids = [make_new_run_id() for _ in partitions] - with create_and_delete_test_runs(instance, run_ids): - for partition, run_id in zip([f"partition_{x}" for x in partitions], run_ids): - run_events, _ = _synthesize_events( - lambda: op_partitioned(), - instance=created_instance, - run_config=get_partitioned_config(partition), - run_id=run_id, - ) - for event in run_events: - storage.store_event(event) + records = storage.get_event_records( + EventRecordsFilter( + event_type=DagsterEventType.ASSET_MATERIALIZATION, + asset_key=AssetKey("asset_key"), + ) + ) + assert len(records) == 4 - records = storage.get_event_records( - EventRecordsFilter( - event_type=DagsterEventType.ASSET_MATERIALIZATION, - asset_key=AssetKey("asset_key"), - ) - ) - assert len(records) == 4 + records = storage.get_event_records( + EventRecordsFilter( + event_type=DagsterEventType.ASSET_MATERIALIZATION, + asset_key=AssetKey("asset_key"), + asset_partitions=["partition_a", "partition_b"], + ) + ) + assert len(records) == 3 - records = storage.get_event_records( - EventRecordsFilter( - event_type=DagsterEventType.ASSET_MATERIALIZATION, - asset_key=AssetKey("asset_key"), - asset_partitions=["partition_a", "partition_b"], - ) - ) - assert len(records) == 3 + def test_get_asset_keys(self, storage, instance): + test_run_id = make_new_run_id() - def test_get_asset_keys(self, storage, test_run_id): @op def gen_op(): yield AssetMaterialization(asset_key=AssetKey(["a"])) @@ -2524,46 +2431,38 @@ def gen_op(): yield AssetMaterialization(asset_key=AssetKey(["b", "z"])) yield Output(1) - with instance_for_test() as created_instance: - if not storage.has_instance: - storage.register_instance(created_instance) - - events, _ = _synthesize_events( - lambda: gen_op(), instance=created_instance, run_id=test_run_id - ) - for event in events: - storage.store_event(event) - - asset_keys = storage.get_asset_keys() - assert len(asset_keys) == 6 - # should come out sorted - assert [asset_key.to_string() for asset_key in asset_keys] == [ - '["a"]', - '["b", "x"]', - '["b", "y"]', - '["b", "z"]', - '["banana"]', - '["c"]', - ] - - # pagination fields - asset_keys = storage.get_asset_keys(cursor='["b", "y"]', limit=1) - assert len(asset_keys) == 1 - assert asset_keys[0].to_string() == '["b", "z"]' + _synthesize_events(lambda: gen_op(), instance=instance, run_id=test_run_id) + + asset_keys = storage.get_asset_keys() + assert len(asset_keys) == 6 + # should come out sorted + assert [asset_key.to_string() for asset_key in asset_keys] == [ + '["a"]', + '["b", "x"]', + '["b", "y"]', + '["b", "z"]', + '["banana"]', + '["c"]', + ] - # pagination still works even if the key is not in the list - asset_keys = storage.get_asset_keys(cursor='["b", "w"]', limit=1) - assert len(asset_keys) == 1 - assert asset_keys[0].to_string() == '["b", "x"]' - - # prefix filter - asset_keys = storage.get_asset_keys(prefix=["b"]) - assert len(asset_keys) == 3 - assert [asset_key.to_string() for asset_key in asset_keys] == [ - '["b", "x"]', - '["b", "y"]', - '["b", "z"]', - ] + # pagination fields + asset_keys = storage.get_asset_keys(cursor='["b", "y"]', limit=1) + assert len(asset_keys) == 1 + assert asset_keys[0].to_string() == '["b", "z"]' + + # pagination still works even if the key is not in the list + asset_keys = storage.get_asset_keys(cursor='["b", "w"]', limit=1) + assert len(asset_keys) == 1 + assert asset_keys[0].to_string() == '["b", "x"]' + + # prefix filter + asset_keys = storage.get_asset_keys(prefix=["b"]) + assert len(asset_keys) == 3 + assert [asset_key.to_string() for asset_key in asset_keys] == [ + '["b", "x"]', + '["b", "y"]', + '["b", "z"]', + ] def test_get_materialized_partitions(self, storage, instance): a = AssetKey("no_materializations_asset") @@ -2590,87 +2489,73 @@ def materialize_three(): yield AssetMaterialization(c, partition="c") yield Output(None) - with instance_for_test() as created_instance: - if not storage.has_instance: - storage.register_instance(created_instance) - - run_id_1 = make_new_run_id() - run_id_2 = make_new_run_id() - run_id_3 = make_new_run_id() - run_id_4 = make_new_run_id() - - with create_and_delete_test_runs(instance, [run_id_1, run_id_2, run_id_3]): - cursor_run1 = _store_materialization_events( - storage, materialize, created_instance, run_id_1 - ) - assert storage.get_materialized_partitions(a) == set() - assert storage.get_materialized_partitions(b) == set() - assert storage.get_materialized_partitions(c) == {"a", "b"} - - cursor_run2 = _store_materialization_events( - storage, materialize_two, created_instance, run_id_2 - ) - _store_materialization_events( - storage, materialize_three, created_instance, run_id_3 - ) - - # test that the cursoring logic works - assert storage.get_materialized_partitions(a) == set() - assert storage.get_materialized_partitions(b) == set() - assert storage.get_materialized_partitions(c) == {"a", "b", "c"} - assert storage.get_materialized_partitions(d) == {"x"} - assert storage.get_materialized_partitions(a, before_cursor=cursor_run1) == set() - assert storage.get_materialized_partitions(b, before_cursor=cursor_run1) == set() - assert storage.get_materialized_partitions(c, before_cursor=cursor_run1) == { - "a", - "b", - } - assert storage.get_materialized_partitions(d, before_cursor=cursor_run1) == set() - assert storage.get_materialized_partitions(a, after_cursor=cursor_run1) == set() - assert storage.get_materialized_partitions(b, after_cursor=cursor_run1) == set() - assert storage.get_materialized_partitions(c, after_cursor=cursor_run1) == { - "a", - "c", - } - assert storage.get_materialized_partitions(d, after_cursor=cursor_run1) == {"x"} - assert ( - storage.get_materialized_partitions( - a, before_cursor=cursor_run2, after_cursor=cursor_run1 - ) - == set() - ) - assert ( - storage.get_materialized_partitions( - b, before_cursor=cursor_run2, after_cursor=cursor_run1 - ) - == set() - ) - assert storage.get_materialized_partitions( - c, before_cursor=cursor_run2, after_cursor=cursor_run1 - ) == {"a"} - assert storage.get_materialized_partitions( - d, before_cursor=cursor_run2, after_cursor=cursor_run1 - ) == {"x"} - assert storage.get_materialized_partitions(a, after_cursor=9999999999) == set() - assert storage.get_materialized_partitions(b, after_cursor=9999999999) == set() - assert storage.get_materialized_partitions(c, after_cursor=9999999999) == set() - assert storage.get_materialized_partitions(d, after_cursor=9999999999) == set() - - # wipe asset, make sure we respect that - if self.can_wipe(): - storage.wipe_asset(c) - assert storage.get_materialized_partitions(c) == set() - - _store_materialization_events( - storage, materialize_two, created_instance, run_id_4 - ) - - assert storage.get_materialized_partitions(c) == {"a"} - assert storage.get_materialized_partitions(d) == {"x"} + run_id_1 = make_new_run_id() + run_id_2 = make_new_run_id() + run_id_3 = make_new_run_id() + run_id_4 = make_new_run_id() - # make sure adding an after_cursor doesn't mess with the wiped events - assert storage.get_materialized_partitions(c, after_cursor=9999999999) == set() - assert storage.get_materialized_partitions(d, after_cursor=9999999999) == set() + cursor_run1 = _store_materialization_events(storage, materialize, instance, run_id_1) + assert storage.get_materialized_partitions(a) == set() + assert storage.get_materialized_partitions(b) == set() + assert storage.get_materialized_partitions(c) == {"a", "b"} + + cursor_run2 = _store_materialization_events(storage, materialize_two, instance, run_id_2) + _store_materialization_events(storage, materialize_three, instance, run_id_3) + + # test that the cursoring logic works + assert storage.get_materialized_partitions(a) == set() + assert storage.get_materialized_partitions(b) == set() + assert storage.get_materialized_partitions(c) == {"a", "b", "c"} + assert storage.get_materialized_partitions(d) == {"x"} + assert storage.get_materialized_partitions(a, before_cursor=cursor_run1) == set() + assert storage.get_materialized_partitions(b, before_cursor=cursor_run1) == set() + assert storage.get_materialized_partitions(c, before_cursor=cursor_run1) == { + "a", + "b", + } + assert storage.get_materialized_partitions(d, before_cursor=cursor_run1) == set() + assert storage.get_materialized_partitions(a, after_cursor=cursor_run1) == set() + assert storage.get_materialized_partitions(b, after_cursor=cursor_run1) == set() + assert storage.get_materialized_partitions(c, after_cursor=cursor_run1) == { + "a", + "c", + } + assert storage.get_materialized_partitions(d, after_cursor=cursor_run1) == {"x"} + assert ( + storage.get_materialized_partitions( + a, before_cursor=cursor_run2, after_cursor=cursor_run1 + ) + == set() + ) + assert ( + storage.get_materialized_partitions( + b, before_cursor=cursor_run2, after_cursor=cursor_run1 + ) + == set() + ) + assert storage.get_materialized_partitions( + c, before_cursor=cursor_run2, after_cursor=cursor_run1 + ) == {"a"} + assert storage.get_materialized_partitions( + d, before_cursor=cursor_run2, after_cursor=cursor_run1 + ) == {"x"} + assert storage.get_materialized_partitions(a, after_cursor=9999999999) == set() + assert storage.get_materialized_partitions(b, after_cursor=9999999999) == set() + assert storage.get_materialized_partitions(c, after_cursor=9999999999) == set() + assert storage.get_materialized_partitions(d, after_cursor=9999999999) == set() + + # wipe asset, make sure we respect that + storage.wipe_asset(c) + assert storage.get_materialized_partitions(c) == set() + + _store_materialization_events(storage, materialize_two, instance, run_id_4) + + assert storage.get_materialized_partitions(c) == {"a"} + assert storage.get_materialized_partitions(d) == {"x"} + + # make sure adding an after_cursor doesn't mess with the wiped events + assert storage.get_materialized_partitions(c, after_cursor=9999999999) == set() + assert storage.get_materialized_partitions(d, after_cursor=9999999999) == set() def test_get_latest_storage_ids_by_partition(self, storage, instance): a = AssetKey(["a"]) @@ -2735,13 +2620,12 @@ def _store_partition_event(asset_key, partition) -> int: latest_storage_ids["p3"] = _store_partition_event(a, "p3") _assert_storage_matches(latest_storage_ids) - if self.can_wipe(): - storage.wipe_asset(a) - latest_storage_ids = {} - _assert_storage_matches(latest_storage_ids) + storage.wipe_asset(a) + latest_storage_ids = {} + _assert_storage_matches(latest_storage_ids) - latest_storage_ids["p1"] = _store_partition_event(a, "p1") - _assert_storage_matches(latest_storage_ids) + latest_storage_ids["p1"] = _store_partition_event(a, "p1") + _assert_storage_matches(latest_storage_ids) @pytest.mark.parametrize( "dagster_event_type", @@ -2881,22 +2765,21 @@ def _store_partition_event(asset_key, partition, tags) -> int: "p3": {"dagster/a": "1", "dagster/b": "1"}, } - if self.can_wipe(): - storage.wipe_asset(a) - assert ( - storage.get_latest_tags_by_partition( - a, - dagster_event_type, - tag_keys=["dagster/a", "dagster/b"], - ) - == {} + storage.wipe_asset(a) + assert ( + storage.get_latest_tags_by_partition( + a, + dagster_event_type, + tag_keys=["dagster/a", "dagster/b"], ) - _store_partition_event(a, "p1", tags={"dagster/a": "3", "dagster/b": "3"}) - assert storage.get_latest_tags_by_partition( - a, dagster_event_type, tag_keys=["dagster/a", "dagster/b"] - ) == { - "p1": {"dagster/a": "3", "dagster/b": "3"}, - } + == {} + ) + _store_partition_event(a, "p1", tags={"dagster/a": "3", "dagster/b": "3"}) + assert storage.get_latest_tags_by_partition( + a, dagster_event_type, tag_keys=["dagster/a", "dagster/b"] + ) == { + "p1": {"dagster/a": "3", "dagster/b": "3"}, + } @pytest.mark.parametrize( "dagster_event_type", @@ -3021,22 +2904,21 @@ def _store_partition_event(asset_key, partition, tags) -> int: "p3": {DATA_VERSION_TAG: "1"}, } - if self.can_wipe(): - storage.wipe_asset(a) - assert ( - storage.get_latest_tags_by_partition( - a, - dagster_event_type, - tag_keys=[DATA_VERSION_TAG], - ) - == {} + storage.wipe_asset(a) + assert ( + storage.get_latest_tags_by_partition( + a, + dagster_event_type, + tag_keys=[DATA_VERSION_TAG], ) - _store_partition_event(a, "p1", tags={DATA_VERSION_TAG: "3"}) - assert storage.get_latest_tags_by_partition( - a, dagster_event_type, tag_keys=[DATA_VERSION_TAG] - ) == { - "p1": {DATA_VERSION_TAG: "3"}, - } + == {} + ) + _store_partition_event(a, "p1", tags={DATA_VERSION_TAG: "3"}) + assert storage.get_latest_tags_by_partition( + a, dagster_event_type, tag_keys=[DATA_VERSION_TAG] + ) == { + "p1": {DATA_VERSION_TAG: "3"}, + } def test_get_latest_asset_partition_materialization_attempts_without_materializations( self, storage, instance @@ -3191,64 +3073,63 @@ def _assert_matches_not_including_event_id(result, expected): ) # wipe asset, make sure we respect that - if self.can_wipe(): - storage.wipe_asset(a) - _assert_matches_not_including_event_id( - storage.get_latest_asset_partition_materialization_attempts_without_materializations( - a - ), - {}, - ) - - storage.store_event( - EventLogEntry( - error_info=None, - level="debug", - user_message="", - run_id=run_id_4, - timestamp=time.time(), - dagster_event=DagsterEvent( - DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value, - "nonce", - event_specific_data=AssetMaterializationPlannedData(a, "bar"), - ), - ) - ) + storage.wipe_asset(a) + _assert_matches_not_including_event_id( + storage.get_latest_asset_partition_materialization_attempts_without_materializations( + a + ), + {}, + ) - # new materialization planned appears - _assert_matches_not_including_event_id( - storage.get_latest_asset_partition_materialization_attempts_without_materializations( - a + storage.store_event( + EventLogEntry( + error_info=None, + level="debug", + user_message="", + run_id=run_id_4, + timestamp=time.time(), + dagster_event=DagsterEvent( + DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value, + "nonce", + event_specific_data=AssetMaterializationPlannedData(a, "bar"), ), - { - "bar": run_id_4, - }, ) + ) - storage.store_event( - EventLogEntry( - error_info=None, - level="debug", - user_message="", - run_id=run_id_4, - timestamp=time.time(), - dagster_event=DagsterEvent( - DagsterEventType.ASSET_MATERIALIZATION.value, - "nonce", - event_specific_data=StepMaterializationData( - AssetMaterialization(asset_key=a, partition="bar") - ), + # new materialization planned appears + _assert_matches_not_including_event_id( + storage.get_latest_asset_partition_materialization_attempts_without_materializations( + a + ), + { + "bar": run_id_4, + }, + ) + + storage.store_event( + EventLogEntry( + error_info=None, + level="debug", + user_message="", + run_id=run_id_4, + timestamp=time.time(), + dagster_event=DagsterEvent( + DagsterEventType.ASSET_MATERIALIZATION.value, + "nonce", + event_specific_data=StepMaterializationData( + AssetMaterialization(asset_key=a, partition="bar") ), - ) - ) - - # and goes away - _assert_matches_not_including_event_id( - storage.get_latest_asset_partition_materialization_attempts_without_materializations( - a ), - {}, ) + ) + + # and goes away + _assert_matches_not_including_event_id( + storage.get_latest_asset_partition_materialization_attempts_without_materializations( + a + ), + {}, + ) def test_get_latest_asset_partition_materialization_attempts_without_materializations_external_asset( self, storage, instance @@ -3554,97 +3435,87 @@ def gen_op(): .to_serializable_subset() ) - with instance_for_test() as created_instance: - if not storage.has_instance: - storage.register_instance(created_instance) - - with create_and_delete_test_runs(instance, [gen_events_run_id, subset_event_run_id]): - events_one, _ = _synthesize_events( - lambda: gen_op(), instance=created_instance, run_id=gen_events_run_id - ) - for event in events_one: - storage.store_event(event) + _synthesize_events(lambda: gen_op(), instance=instance, run_id=gen_events_run_id) - storage.store_event( - EventLogEntry( - error_info=None, - level="debug", - user_message="", - run_id=subset_event_run_id, - timestamp=time.time(), - dagster_event=DagsterEvent( - DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value, - "nonce", - event_specific_data=AssetMaterializationPlannedData( - a, partition="2023-01-05" - ), + with create_and_delete_test_runs(instance, [subset_event_run_id]): + storage.store_event( + EventLogEntry( + error_info=None, + level="debug", + user_message="", + run_id=subset_event_run_id, + timestamp=time.time(), + dagster_event=DagsterEvent( + DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value, + "nonce", + event_specific_data=AssetMaterializationPlannedData( + a, partition="2023-01-05" ), - ) + ), ) + ) - single_partition_event_id = storage.get_latest_planned_materialization_info( - asset_key=a - ).storage_id + single_partition_event_id = storage.get_latest_planned_materialization_info( + asset_key=a + ).storage_id - storage.store_event( - EventLogEntry( - error_info=None, - level="debug", - user_message="", - run_id=subset_event_run_id, - timestamp=time.time(), - dagster_event=DagsterEvent( - DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value, - "nonce", - event_specific_data=AssetMaterializationPlannedData( - a, partitions_subset=partitions_subset - ), + storage.store_event( + EventLogEntry( + error_info=None, + level="debug", + user_message="", + run_id=subset_event_run_id, + timestamp=time.time(), + dagster_event=DagsterEvent( + DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value, + "nonce", + event_specific_data=AssetMaterializationPlannedData( + a, partitions_subset=partitions_subset ), - ) + ), ) + ) - partition_subset_event_id = storage.get_latest_planned_materialization_info( - asset_key=a - ).storage_id + partition_subset_event_id = storage.get_latest_planned_materialization_info( + asset_key=a + ).storage_id - assert storage.get_materialized_partitions(a) == {"2023-01-05"} - planned_but_no_materialization_partitions = storage.get_latest_asset_partition_materialization_attempts_without_materializations( - a + assert storage.get_materialized_partitions(a) == {"2023-01-05"} + planned_but_no_materialization_partitions = storage.get_latest_asset_partition_materialization_attempts_without_materializations( + a + ) + if self.has_asset_partitions_table(): + # When an asset partitions table is present we can fetch planned but not + # materialized partitions when planned events target a partitions subset + assert len(planned_but_no_materialization_partitions) == 2 + assert planned_but_no_materialization_partitions.keys() == { + "2023-01-05", + "2023-01-06", + } + assert planned_but_no_materialization_partitions["2023-01-05"] == ( + subset_event_run_id, + partition_subset_event_id, + ) + assert planned_but_no_materialization_partitions["2023-01-06"] == ( + subset_event_run_id, + partition_subset_event_id, ) - if self.has_asset_partitions_table(): - # When an asset partitions table is present we can fetch planned but not - # materialized partitions when planned events target a partitions subset - assert len(planned_but_no_materialization_partitions) == 2 - assert planned_but_no_materialization_partitions.keys() == { - "2023-01-05", - "2023-01-06", - } - assert planned_but_no_materialization_partitions["2023-01-05"] == ( - subset_event_run_id, - partition_subset_event_id, - ) - assert planned_but_no_materialization_partitions["2023-01-06"] == ( - subset_event_run_id, - partition_subset_event_id, - ) - else: - # When an asset partitions table is not present we can only fetch planned but - # not materialized partitions when planned events target a single partition - assert planned_but_no_materialization_partitions.keys() == {"2023-01-05"} - assert planned_but_no_materialization_partitions["2023-01-05"] == ( - subset_event_run_id, - single_partition_event_id, - ) + else: + # When an asset partitions table is not present we can only fetch planned but + # not materialized partitions when planned events target a single partition + assert planned_but_no_materialization_partitions.keys() == {"2023-01-05"} + assert planned_but_no_materialization_partitions["2023-01-05"] == ( + subset_event_run_id, + single_partition_event_id, + ) - # When asset partitions table is not present get_latest_storage_id_by_partition - # only returns storage IDs for single-partition events - latest_storage_id_by_partition = storage.get_latest_storage_id_by_partition( - a, DagsterEventType.ASSET_MATERIALIZATION_PLANNED - ) - assert latest_storage_id_by_partition == { - "2023-01-05": single_partition_event_id - } + # When asset partitions table is not present get_latest_storage_id_by_partition + # only returns storage IDs for single-partition events + latest_storage_id_by_partition = storage.get_latest_storage_id_by_partition( + a, DagsterEventType.ASSET_MATERIALIZATION_PLANNED + ) + assert latest_storage_id_by_partition == {"2023-01-05": single_partition_event_id} def test_get_latest_asset_partition_materialization_attempts_without_materializations_event_ids( self, storage, instance @@ -3727,42 +3598,36 @@ def test_get_latest_asset_partition_materialization_attempts_without_materializa } ) - def test_get_observation(self, storage: EventLogStorage, test_run_id: str): + def test_get_observation(self, storage: EventLogStorage, instance): a = AssetKey(["key_a"]) + test_run_id = make_new_run_id() + @op def gen_op(): yield AssetObservation(asset_key=a, metadata={"foo": "bar"}) yield Output(1) - with instance_for_test() as instance: - if not storage.has_instance: - storage.register_instance(instance) - - events_one, _ = _synthesize_events( - lambda: gen_op(), instance=instance, run_id=test_run_id - ) - for event in events_one: - storage.store_event(event) + _synthesize_events(lambda: gen_op(), instance=instance, run_id=test_run_id) - # legacy API - records = storage.get_event_records( - EventRecordsFilter( - event_type=DagsterEventType.ASSET_OBSERVATION, - asset_key=a, - ) + # legacy API + records = storage.get_event_records( + EventRecordsFilter( + event_type=DagsterEventType.ASSET_OBSERVATION, + asset_key=a, ) + ) - assert len(records) == 1 + assert len(records) == 1 - # new API - result = storage.fetch_observations(a, limit=100) - assert isinstance(result, EventRecordsResult) - assert len(result.records) == 1 - record = result.records[0] - assert record.event_log_entry.dagster_event - assert record.event_log_entry.dagster_event.asset_key == a - assert result.cursor == EventLogCursor.from_storage_id(record.storage_id).to_string() + # new API + result = storage.fetch_observations(a, limit=100) + assert isinstance(result, EventRecordsResult) + assert len(result.records) == 1 + record = result.records[0] + assert record.event_log_entry.dagster_event + assert record.event_log_entry.dagster_event.asset_key == a + assert result.cursor == EventLogCursor.from_storage_id(record.storage_id).to_string() def test_get_planned_materialization(self, storage: EventLogStorage, test_run_id: str): a = AssetKey(["key_a"]) @@ -3847,31 +3712,19 @@ def my_op(): run_id_1 = make_new_run_id() run_id_2 = make_new_run_id() - with create_and_delete_test_runs(instance, [run_id_1, run_id_2]): - with instance_for_test() as created_instance: - if not storage.has_instance: - storage.register_instance(created_instance) - - events, _ = _synthesize_events( - lambda: my_op(), instance=created_instance, run_id=run_id_1 - ) - for event in events: - storage.store_event(event) + _synthesize_events(lambda: my_op(), instance=instance, run_id=run_id_1) - assert [key] == storage.all_asset_keys() + assert [key] == storage.all_asset_keys() - if self.can_wipe(): - storage.wipe_asset(key) + storage.wipe_asset(key) - assert len(storage.all_asset_keys()) == 0 + assert len(storage.all_asset_keys()) == 0 - events, _ = _synthesize_events( - lambda: my_op(), instance=created_instance, run_id=run_id_2 - ) - for event in events: - storage.store_event(event) + events, _ = _synthesize_events(lambda: my_op(), instance=instance, run_id=run_id_2) + for event in events: + storage.store_event(event) - assert [key] == storage.all_asset_keys() + assert [key] == storage.all_asset_keys() def test_filter_on_storage_ids( self, @@ -3886,44 +3739,38 @@ def gen_op(): yield AssetMaterialization(asset_key=a, metadata={"foo": "bar"}) yield Output(1) - with instance_for_test() as instance: - if not storage.has_instance: - storage.register_instance(instance) + test_run_id = make_new_run_id() - events_one, _ = _synthesize_events( - lambda: gen_op(), instance=instance, run_id=test_run_id - ) - for event in events_one: - storage.store_event(event) + _synthesize_events(lambda: gen_op(), instance=instance, run_id=test_run_id) - records = storage.get_event_records( - EventRecordsFilter( - event_type=DagsterEventType.ASSET_MATERIALIZATION, - asset_key=a, - ) + records = storage.get_event_records( + EventRecordsFilter( + event_type=DagsterEventType.ASSET_MATERIALIZATION, + asset_key=a, ) - assert len(records) == 1 - storage_id = records[0].storage_id + ) + assert len(records) == 1 + storage_id = records[0].storage_id - records = storage.get_event_records( - EventRecordsFilter( - event_type=DagsterEventType.ASSET_MATERIALIZATION, storage_ids=[storage_id] - ) + records = storage.get_event_records( + EventRecordsFilter( + event_type=DagsterEventType.ASSET_MATERIALIZATION, storage_ids=[storage_id] ) - assert len(records) == 1 - assert records[0].storage_id == storage_id + ) + assert len(records) == 1 + assert records[0].storage_id == storage_id - # Assert that not providing storage IDs to filter on will still fetch events - assert ( - len( - storage.get_event_records( - EventRecordsFilter( - event_type=DagsterEventType.ASSET_MATERIALIZATION, - ) + # Assert that not providing storage IDs to filter on will still fetch events + assert ( + len( + storage.get_event_records( + EventRecordsFilter( + event_type=DagsterEventType.ASSET_MATERIALIZATION, ) ) - == 1 ) + == 1 + ) def test_get_asset_records( self, @@ -3938,114 +3785,109 @@ def my_asset(): def second_asset(my_asset): return 2 - with instance_for_test() as created_instance: - if not storage.has_instance: - storage.register_instance(created_instance) - - my_asset_key = AssetKey("my_asset") - # second_asset_key = AssetKey("second_asset") - # storage.get_asset_records([my_asset_key, second_asset_key]) - - assert len(storage.get_asset_records()) == 0 - - run_id_1 = make_new_run_id() - run_id_2 = make_new_run_id() - with create_and_delete_test_runs(instance, [run_id_1, run_id_2]): - defs = Definitions( - assets=[my_asset, second_asset], - jobs=[ - define_asset_job("one_asset_job", ["my_asset"]), - define_asset_job("two_asset_job"), - ], - ) - result = _execute_job_and_store_events( - created_instance, - storage, - defs.get_job_def("one_asset_job"), - run_id=run_id_1, - ) - records = storage.get_asset_records([my_asset_key]) + my_asset_key = AssetKey("my_asset") + # second_asset_key = AssetKey("second_asset") + # storage.get_asset_records([my_asset_key, second_asset_key]) - assert len(records) == 1 - asset_entry = records[0].asset_entry - assert asset_entry.asset_key == my_asset_key - materialize_event = next( - event for event in result.all_events if event.is_step_materialization - ) + assert len(storage.get_asset_records()) == 0 - assert asset_entry.last_materialization - assert asset_entry.last_materialization.dagster_event == materialize_event - assert asset_entry.last_run_id == result.run_id - assert asset_entry.asset_details is None - - # get the materialization from the one_asset_job run - event_log_record = storage.get_records_for_run( - run_id_1, - of_type=DagsterEventType.ASSET_MATERIALIZATION, - limit=1, - ascending=False, - ).records[0] - - if self.is_sqlite(storage): - # sqlite storages are run sharded, so the storage ids in the run-shard will be - # different from the storage ids in the index shard. We therefore cannot - # compare records directly for sqlite. But we can compare event log entries. - assert ( - asset_entry.last_materialization_record - and asset_entry.last_materialization_record.event_log_entry - == event_log_record.event_log_entry - ) - else: - assert asset_entry.last_materialization_record == event_log_record - - # get the planned materialization from the one asset_job run - materialization_planned_record = storage.get_records_for_run( - run_id_1, - of_type=DagsterEventType.ASSET_MATERIALIZATION_PLANNED, - limit=1, - ascending=False, - ).records[0] - - if storage.asset_records_have_last_planned_materialization_storage_id: - assert ( - asset_entry.last_planned_materialization_storage_id - == materialization_planned_record.storage_id - ) - else: - assert not asset_entry.last_planned_materialization_storage_id + run_id_1 = make_new_run_id() + run_id_2 = make_new_run_id() + defs = Definitions( + assets=[my_asset, second_asset], + jobs=[ + define_asset_job("one_asset_job", ["my_asset"]), + define_asset_job("two_asset_job"), + ], + ) + result = _execute_job_and_store_events( + instance, + storage, + defs.get_job_def("one_asset_job"), + run_id=run_id_1, + ) + records = storage.get_asset_records([my_asset_key]) + + assert len(records) == 1 + asset_entry = records[0].asset_entry + assert asset_entry.asset_key == my_asset_key + materialize_event = next( + event for event in result.all_events if event.is_step_materialization + ) - if self.can_wipe(): - storage.wipe_asset(my_asset_key) + assert asset_entry.last_materialization + assert asset_entry.last_materialization.dagster_event == materialize_event + assert asset_entry.last_run_id == result.run_id + assert asset_entry.asset_details is None - # confirm that last_run_id is None when asset is wiped - assert len(storage.get_asset_records([my_asset_key])) == 0 + # get the materialization from the one_asset_job run + event_log_record = storage.get_records_for_run( + run_id_1, + of_type=DagsterEventType.ASSET_MATERIALIZATION, + limit=1, + ascending=False, + ).records[0] - result = _execute_job_and_store_events( - created_instance, - storage, - defs.get_job_def("two_asset_job"), - run_id=run_id_2, - ) - records = storage.get_asset_records([my_asset_key]) - assert len(records) == 1 - records = storage.get_asset_records([]) # should select no assets - assert len(records) == 0 - records = storage.get_asset_records() # should select all assets - assert len(records) == 2 - - records = list(records) - records.sort( - key=lambda record: record.asset_entry.asset_key - ) # order by asset key - asset_entry = records[0].asset_entry - assert asset_entry.asset_key == my_asset_key - materialize_event = next( - event for event in result.all_events if event.is_step_materialization - ) - assert asset_entry.last_materialization - assert asset_entry.last_materialization.dagster_event == materialize_event - assert asset_entry.last_run_id == result.run_id - assert isinstance(asset_entry.asset_details, AssetDetails) + if self.is_sqlite(storage): + # sqlite storages are run sharded, so the storage ids in the run-shard will be + # different from the storage ids in the index shard. We therefore cannot + # compare records directly for sqlite. But we can compare event log entries. + assert ( + asset_entry.last_materialization_record + and asset_entry.last_materialization_record.event_log_entry + == event_log_record.event_log_entry + ) + else: + assert asset_entry.last_materialization_record == event_log_record + + # get the planned materialization from the one asset_job run + materialization_planned_record = storage.get_records_for_run( + run_id_1, + of_type=DagsterEventType.ASSET_MATERIALIZATION_PLANNED, + limit=1, + ascending=False, + ).records[0] + + if storage.asset_records_have_last_planned_materialization_storage_id: + assert ( + asset_entry.last_planned_materialization_storage_id + == materialization_planned_record.storage_id + ) + else: + assert not asset_entry.last_planned_materialization_storage_id + + storage.wipe_asset(my_asset_key) + + # confirm that last_run_id is None when asset is wiped + assert len(storage.get_asset_records([my_asset_key])) == 0 + + result = _execute_job_and_store_events( + instance, + storage, + defs.get_job_def("two_asset_job"), + run_id=run_id_2, + ) + records = storage.get_asset_records([my_asset_key]) + assert len(records) == 1 + records = storage.get_asset_records([]) # should select no assets + assert len(records) == 0 + + records = storage.get_asset_records() # should select all assets + if self.can_query_all_asset_records(): + assert len(records) == 2 + records = list(records) + records.sort(key=lambda record: record.asset_entry.asset_key) # order by asset key + asset_entry = records[0].asset_entry + assert asset_entry.asset_key == my_asset_key + materialize_event = next( + event for event in result.all_events if event.is_step_materialization + ) + assert asset_entry.last_materialization + assert asset_entry.last_materialization.dagster_event == materialize_event + assert asset_entry.last_run_id == result.run_id + assert isinstance(asset_entry.asset_details, AssetDetails) + else: + assert len(records) == 0 def test_asset_record_run_id_wiped( self, @@ -4067,42 +3909,28 @@ def observe_asset(): run_id_1 = make_new_run_id() run_id_2 = make_new_run_id() run_id_3 = make_new_run_id() - with create_and_delete_test_runs(instance, [run_id_1, run_id_2, run_id_3]): - with instance_for_test() as created_instance: - if not storage.has_instance: - storage.register_instance(created_instance) + _synthesize_events(lambda: observe_asset(), instance=instance, run_id=run_id_1) + asset_entry = storage.get_asset_records([asset_key])[0].asset_entry + assert asset_entry.last_run_id is None + + _synthesize_events( + lambda: materialize_asset(), + instance=instance, + run_id=run_id_2, + ) + asset_entry = storage.get_asset_records([asset_key])[0].asset_entry + assert asset_entry.last_run_id == run_id_2 - events, result = _synthesize_events( - lambda: observe_asset(), instance=created_instance, run_id=run_id_1 - ) - for event in events: - storage.store_event(event) - asset_entry = storage.get_asset_records([asset_key])[0].asset_entry - assert asset_entry.last_run_id is None - - events, result = _synthesize_events( - lambda: materialize_asset(), - instance=created_instance, - run_id=run_id_2, - ) - for event in events: - storage.store_event(event) - asset_entry = storage.get_asset_records([asset_key])[0].asset_entry - assert asset_entry.last_run_id == result.run_id - - if self.can_wipe(): - storage.wipe_asset(asset_key) - assert len(storage.get_asset_records([asset_key])) == 0 - - events, result = _synthesize_events( - lambda: observe_asset(), - instance=created_instance, - run_id=run_id_3, - ) - for event in events: - storage.store_event(event) - asset_entry = storage.get_asset_records([asset_key])[0].asset_entry - assert asset_entry.last_run_id is None + storage.wipe_asset(asset_key) + assert len(storage.get_asset_records([asset_key])) == 0 + + _synthesize_events( + lambda: observe_asset(), + instance=instance, + run_id=run_id_3, + ) + asset_entry = storage.get_asset_records([asset_key])[0].asset_entry + assert asset_entry.last_run_id is None def test_asset_record_last_observation( self, @@ -4122,24 +3950,15 @@ def observe_asset(): yield Output(5) run_id = make_new_run_id() - with create_and_delete_test_runs(instance, [run_id]): - with instance_for_test() as created_instance: - if not storage.has_instance: - storage.register_instance(created_instance) - - events, result = _synthesize_events( - lambda: observe_asset(), instance=created_instance, run_id=run_id - ) - for event in events: - storage.store_event(event) + _synthesize_events(lambda: observe_asset(), instance=instance, run_id=run_id) - # there is an observation - fetched_record = storage.fetch_observations(asset_key, limit=1).records[0] - assert fetched_record + # there is an observation + fetched_record = storage.fetch_observations(asset_key, limit=1).records[0] + assert fetched_record - # the observation is stored on the asset record - asset_entry = storage.get_asset_records([asset_key])[0].asset_entry - assert asset_entry.last_observation_record == fetched_record + # the observation is stored on the asset record + asset_entry = storage.get_asset_records([asset_key])[0].asset_entry + assert asset_entry.last_observation_record == fetched_record def test_last_run_id_updates_on_materialization_planned( self, @@ -4152,40 +3971,34 @@ def never_materializes_asset(): run_id_1 = make_new_run_id() run_id_2 = make_new_run_id() - with create_and_delete_test_runs(instance, [run_id_1, run_id_2]): - with instance_for_test() as created_instance: - if not storage.has_instance: - storage.register_instance(created_instance) + asset_key = AssetKey("never_materializes_asset") + never_materializes_job = Definitions( + assets=[never_materializes_asset], + ).get_implicit_global_asset_job_def() - asset_key = AssetKey("never_materializes_asset") - never_materializes_job = Definitions( - assets=[never_materializes_asset], - ).get_implicit_global_asset_job_def() - - result = _execute_job_and_store_events( - created_instance, storage, never_materializes_job, run_id=run_id_1 - ) - records = storage.get_asset_records([asset_key]) + result = _execute_job_and_store_events( + instance, storage, never_materializes_job, run_id=run_id_1 + ) + records = storage.get_asset_records([asset_key]) - assert len(records) == 1 - asset_record = records[0] - assert result.run_id == asset_record.asset_entry.last_run_id + assert len(records) == 1 + asset_record = records[0] + assert result.run_id == asset_record.asset_entry.last_run_id - if self.can_wipe(): - storage.wipe_asset(asset_key) + storage.wipe_asset(asset_key) - # confirm that last_run_id is None when asset is wiped - assert len(storage.get_asset_records([asset_key])) == 0 + # confirm that last_run_id is None when asset is wiped + assert len(storage.get_asset_records([asset_key])) == 0 - result = _execute_job_and_store_events( - created_instance, - storage, - never_materializes_job, - run_id=run_id_2, - ) - records = storage.get_asset_records([asset_key]) - assert len(records) == 1 - assert result.run_id == records[0].asset_entry.last_run_id + result = _execute_job_and_store_events( + instance, + storage, + never_materializes_job, + run_id=run_id_2, + ) + records = storage.get_asset_records([asset_key]) + assert len(records) == 1 + assert result.run_id == records[0].asset_entry.last_run_id def test_get_logs_for_all_runs_by_log_id_of_type(self, storage: EventLogStorage): if not storage.supports_event_consumer_queries(): @@ -4710,27 +4523,26 @@ def _sort_by_country_then_date(tags): {"dagster/partition/country": "US", "dagster/partition/date": "2022-10-13"}, {"dagster/partition/country": "US", "dagster/partition/date": "2022-10-13"}, ] - if self.can_wipe(): - storage.wipe_asset(key) - asset_event_tags = storage.get_event_tags_for_asset( - asset_key=key, - ) - assert asset_event_tags == [] + storage.wipe_asset(key) + asset_event_tags = storage.get_event_tags_for_asset( + asset_key=key, + ) + assert asset_event_tags == [] - events, _ = _synthesize_events(lambda: brazil_op(), run_id_2) - for event in events: - storage.store_event(event) + events, _ = _synthesize_events(lambda: brazil_op(), run_id_2) + for event in events: + storage.store_event(event) - asset_event_tags = storage.get_event_tags_for_asset( - asset_key=key, - filter_tags={ - "dagster/partition/date": "2022-10-13", - "dagster/partition/country": "Brazil", - }, - ) - assert asset_event_tags == [ - {"dagster/partition/country": "Brazil", "dagster/partition/date": "2022-10-13"} - ] + asset_event_tags = storage.get_event_tags_for_asset( + asset_key=key, + filter_tags={ + "dagster/partition/date": "2022-10-13", + "dagster/partition/country": "Brazil", + }, + ) + assert asset_event_tags == [ + {"dagster/partition/country": "Brazil", "dagster/partition/date": "2022-10-13"} + ] def test_multi_partitions_partition_deserialization( self, @@ -4759,24 +4571,15 @@ def my_op(): ) yield Output(5) - with instance_for_test() as created_instance: - if not storage.has_instance: - storage.register_instance(created_instance) - - run_id_1 = make_new_run_id() + run_id_1 = make_new_run_id() - with create_and_delete_test_runs(instance, [run_id_1]): - events_one, _ = _synthesize_events( - lambda: my_op(), instance=created_instance, run_id=run_id_1 - ) - for event in events_one: - storage.store_event(event) + _synthesize_events(lambda: my_op(), instance=instance, run_id=run_id_1) - assert created_instance.get_materialized_partitions(key) == { - MultiPartitionKey({"country": "US", "date": "2022-10-13"}), - MultiPartitionKey({"country": "Mexico", "date": "2022-10-14"}), - MultiPartitionKey({"country": "Canada", "date": "2022-10-13"}), - } + assert instance.get_materialized_partitions(key) == { + MultiPartitionKey({"country": "US", "date": "2022-10-13"}), + MultiPartitionKey({"country": "Mexico", "date": "2022-10-14"}), + MultiPartitionKey({"country": "Canada", "date": "2022-10-13"}), + } def test_store_and_wipe_cached_status( self, @@ -4785,6 +4588,9 @@ def test_store_and_wipe_cached_status( ): asset_key = AssetKey("yay") + if not self.can_write_cached_status(): + pytest.skip("storage cannot write cached status") + @op def yields_materialization(): yield AssetMaterialization(asset_key=asset_key) @@ -4833,37 +4639,34 @@ def yields_materialization(): ) assert _get_cached_status_for_asset(storage, asset_key) == cache_value - if self.can_wipe(): - cache_value = AssetStatusCacheValue( - latest_storage_id=1, - partitions_def_id=None, - serialized_materialized_partition_subset=None, - ) - storage.update_asset_cached_status_data( - asset_key=asset_key, cache_values=cache_value - ) - assert _get_cached_status_for_asset(storage, asset_key) == cache_value - record = storage.get_asset_records([asset_key])[0] - storage.wipe_asset_cached_status(asset_key) - assert _get_cached_status_for_asset(storage, asset_key) is None - post_wipe_record = storage.get_asset_records([asset_key])[0] - assert ( - record.asset_entry.last_materialization_record - == post_wipe_record.asset_entry.last_materialization_record - ) - assert record.asset_entry.last_run_id == post_wipe_record.asset_entry.last_run_id + cache_value = AssetStatusCacheValue( + latest_storage_id=1, + partitions_def_id=None, + serialized_materialized_partition_subset=None, + ) + storage.update_asset_cached_status_data(asset_key=asset_key, cache_values=cache_value) + assert _get_cached_status_for_asset(storage, asset_key) == cache_value + record = storage.get_asset_records([asset_key])[0] + storage.wipe_asset_cached_status(asset_key) + assert _get_cached_status_for_asset(storage, asset_key) is None + post_wipe_record = storage.get_asset_records([asset_key])[0] + assert ( + record.asset_entry.last_materialization_record + == post_wipe_record.asset_entry.last_materialization_record + ) + assert record.asset_entry.last_run_id == post_wipe_record.asset_entry.last_run_id - storage.wipe_asset(asset_key) - assert storage.get_asset_records() == [] + storage.wipe_asset(asset_key) + assert storage.get_asset_records([asset_key]) == [] - events, _ = _synthesize_events( - lambda: yields_materialization(), - run_id=run_id_2, - ) - for event in events: - storage.store_event(event) + events, _ = _synthesize_events( + lambda: yields_materialization(), + run_id=run_id_2, + ) + for event in events: + storage.store_event(event) - assert _get_cached_status_for_asset(storage, asset_key) is None + assert _get_cached_status_for_asset(storage, asset_key) is None def test_add_dynamic_partitions(self, storage: EventLogStorage): assert storage @@ -4928,13 +4731,9 @@ def test_has_dynamic_partition(self, storage: EventLogStorage): assert not storage.has_dynamic_partition(partitions_def_name="bar", partition_key="foo") def test_concurrency(self, storage: EventLogStorage): - assert storage if not storage.supports_global_concurrency_limits: pytest.skip("storage does not support global op concurrency") - if self.can_wipe(): - storage.wipe() - run_id_one = make_new_run_id() run_id_two = make_new_run_id() run_id_three = make_new_run_id() @@ -5006,9 +4805,6 @@ def claim(key, run_id, step_key, priority=0): claim_status = storage.claim_concurrency_slot(key, run_id, step_key, priority) return claim_status.slot_status - if self.can_wipe(): - storage.wipe() - storage.set_concurrency_slots("foo", 5) storage.set_concurrency_slots("bar", 1) @@ -5059,9 +4855,6 @@ def test_concurrency_allocate_from_pending(self, storage: EventLogStorage): if not storage.supports_global_concurrency_limits: pytest.skip("storage does not support global op concurrency") - if self.can_wipe(): - storage.wipe() - run_id = make_new_run_id() def claim(key, run_id, step_key, priority=0): @@ -5132,9 +4925,6 @@ def test_slot_downsize(self, storage: EventLogStorage): if not storage.supports_global_concurrency_limits: pytest.skip("storage does not support global op concurrency") - if self.can_wipe(): - storage.wipe() - run_id = make_new_run_id() def claim(key, run_id, step_key, priority=0): @@ -5165,9 +4955,6 @@ def test_slot_upsize(self, storage: EventLogStorage): if not storage.supports_global_concurrency_limits: pytest.skip("storage does not support global op concurrency") - if self.can_wipe(): - storage.wipe() - run_id = make_new_run_id() def claim(key, run_id, step_key, priority=0): @@ -5214,15 +5001,6 @@ def test_concurrency_run_ids(self, storage: EventLogStorage): if not storage.supports_global_concurrency_limits: pytest.skip("storage does not support global op concurrency") - if not self.can_wipe(): - # this test requires wiping - pytest.skip( - "storage does not support reading run ids for the purpose of freeing concurrency" - " slots" - ) - - storage.wipe() - one = make_new_run_id() two = make_new_run_id() @@ -5232,6 +5010,11 @@ def test_concurrency_run_ids(self, storage: EventLogStorage): storage.claim_concurrency_slot("foo", two, "b") storage.claim_concurrency_slot("foo", one, "c") + try: + storage.get_concurrency_run_ids() + except NotImplementedError: + pytest.skip("Storage does not implement get_concurrency_run_ids") + assert storage.get_concurrency_run_ids() == {one, two} storage.free_concurrency_slots_for_run(one) assert storage.get_concurrency_run_ids() == {two} @@ -5242,9 +5025,6 @@ def test_threaded_concurrency(self, storage: EventLogStorage): if not storage.supports_global_concurrency_limits: pytest.skip("storage does not support global op concurrency") - if self.can_wipe(): - storage.wipe() - TOTAL_TIMEOUT_TIME = 30 run_id = make_new_run_id() @@ -5281,9 +5061,6 @@ def test_zero_concurrency(self, storage: EventLogStorage): if not storage.supports_global_concurrency_limits: pytest.skip("storage does not support global op concurrency") - if self.can_wipe(): - storage.wipe() - run_id = make_new_run_id() # initially there are no concurrency limited keys @@ -5324,9 +5101,6 @@ def test_default_concurrency( if not self.can_set_concurrency_defaults(): pytest.skip("storage does not support setting global op concurrency defaults") - if self.can_wipe(): - storage.wipe() - self.set_default_op_concurrency(instance, storage, 1) # initially there are no concurrency limited keys @@ -5343,9 +5117,6 @@ def test_asset_checks( self, storage: EventLogStorage, ): - if self.can_wipe(): - storage.wipe() - run_id_1, run_id_2, run_id_3 = [make_new_run_id() for _ in range(3)] check_key_1 = AssetCheckKey(AssetKey(["my_asset"]), "my_check") check_key_2 = AssetCheckKey(AssetKey(["my_asset"]), "my_check_2") @@ -5487,9 +5258,6 @@ def test_asset_checks( assert latest_checks[check_key_2].run_id == run_id_3 def test_duplicate_asset_check_planned_events(self, storage: EventLogStorage): - if self.can_wipe(): - storage.wipe() - run_id = make_new_run_id() for _ in range(2): storage.store_event( @@ -5602,9 +5370,6 @@ def test_asset_check_summary_record( storage: EventLogStorage, instance: DagsterInstance, ) -> None: - if self.can_wipe(): - storage.wipe() - run_id_0, run_id_1 = [make_new_run_id() for _ in range(2)] with create_and_delete_test_runs(instance, [run_id_0, run_id_1]): check_key_1 = AssetCheckKey(AssetKey(["my_asset"]), "my_check") @@ -5855,10 +5620,6 @@ class BlowUp(Exception): ... assert len(storage.get_logs_for_run(test_run_id)) == 1 - if self.can_wipe(): - storage.wipe() - assert len(storage.get_logs_for_run(test_run_id)) == 0 - def test_asset_tags_to_insert(self, test_run_id: str, storage: EventLogStorage): key = AssetKey("test_asset") storage.store_event(