diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py index baca090978157..ea4712b24d17c 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py @@ -1280,6 +1280,14 @@ def materialize(_): yield AssetMaterialization( asset_key=other_asset_key, metadata={"count": 1}, partition="z" ) + + yield AssetMaterialization( + asset_key=asset_key, metadata={"count": 1}, partition="dupe_materialization" + ) + yield AssetMaterialization( + asset_key=asset_key, metadata={"count": 2}, partition="dupe_materialization" + ) + yield Output(1) def _ops(): @@ -1298,10 +1306,10 @@ def _get_counts(result): # results come in descending order, by default result = storage.fetch_materializations(asset_key, limit=100) - assert _get_counts(result) == [5, 4, 3, 2, 1] + assert _get_counts(result) == [2, 1, 5, 4, 3, 2, 1] result = storage.fetch_materializations(asset_key, limit=3) - assert _get_counts(result) == [5, 4, 3] + assert _get_counts(result) == [2, 1, 5] # results come in ascending order, limited result = storage.fetch_materializations(asset_key, limit=3, ascending=True) @@ -1323,7 +1331,7 @@ def _get_counts(result): ), limit=100, ) - assert _get_counts(result) == [5, 4, 3, 2] + assert _get_counts(result) == [2, 1, 5, 4, 3, 2] # filter by before storage id result = storage.fetch_materializations( @@ -1387,6 +1395,34 @@ def _get_counts(result): ) assert _get_counts(result) == [5, 3, 1] + # fetch most recent event for a given partition + result = storage.fetch_materializations( + AssetRecordsFilter( + asset_key=asset_key, + asset_partitions=["dupe_materialization"], + ), + limit=1, + ) + assert _get_counts(result) == [2] + + result = storage.fetch_materializations( + AssetRecordsFilter( + asset_key=asset_key, + asset_partitions=["5"], + ), + limit=1, + ) + assert _get_counts(result) == [5] + + result = storage.fetch_materializations( + AssetRecordsFilter( + asset_key=asset_key, + asset_partitions=["non_existant"], + ), + limit=1, + ) + assert _get_counts(result) == [] + def test_asset_observation_fetch(self, storage, instance): asset_key = AssetKey(["path", "to", "asset_one"]) @@ -1399,6 +1435,14 @@ def observe(_): yield AssetObservation(asset_key=asset_key, metadata={"count": 3}, partition="3") yield AssetObservation(asset_key=asset_key, metadata={"count": 4}, partition="4") yield AssetObservation(asset_key=asset_key, metadata={"count": 5}, partition="5") + + yield AssetObservation( + asset_key=asset_key, metadata={"count": 1}, partition="dupe_observation" + ) + yield AssetObservation( + asset_key=asset_key, metadata={"count": 2}, partition="dupe_observation" + ) + yield Output(1) def _ops(): @@ -1416,10 +1460,10 @@ def _get_counts(result): # results come in descending order, by default result = storage.fetch_observations(asset_key, limit=100) - assert _get_counts(result) == [5, 4, 3, 2, 1] + assert _get_counts(result) == [2, 1, 5, 4, 3, 2, 1] result = storage.fetch_observations(asset_key, limit=3) - assert _get_counts(result) == [5, 4, 3] + assert _get_counts(result) == [2, 1, 5] # results come in ascending order, limited result = storage.fetch_observations(asset_key, limit=3, ascending=True) @@ -1438,7 +1482,7 @@ def _get_counts(result): ), limit=100, ) - assert _get_counts(result) == [5, 4, 3, 2] + assert _get_counts(result) == [2, 1, 5, 4, 3, 2] # filter by before storage id result = storage.fetch_observations( @@ -1502,6 +1546,34 @@ def _get_counts(result): ) assert _get_counts(result) == [5, 3, 1] + # fetch most recent event for a given partition + result = storage.fetch_observations( + AssetRecordsFilter( + asset_key=asset_key, + asset_partitions=["dupe_observation"], + ), + limit=1, + ) + assert _get_counts(result) == [2] + + result = storage.fetch_observations( + AssetRecordsFilter( + asset_key=asset_key, + asset_partitions=["5"], + ), + limit=1, + ) + assert _get_counts(result) == [5] + + result = storage.fetch_observations( + AssetRecordsFilter( + asset_key=asset_key, + asset_partitions=["non_existant"], + ), + limit=1, + ) + assert _get_counts(result) == [] + def test_asset_materialization_null_key_fails(self): with pytest.raises(check.CheckError): AssetMaterialization(asset_key=None)