Skip to content

Commit

Permalink
New test case that includes fetching the most recent asset+partition …
Browse files Browse the repository at this point in the history
…combo for materializations and observations (dagster-io#24198)

Test Plan: BK

## Summary & Motivation

## How I Tested These Changes

## Changelog [New | Bug | Docs]

> Replace this message with a changelog entry, or `NOCHANGELOG`
  • Loading branch information
gibsondan authored Sep 4, 2024
1 parent 96caa6b commit d03552d
Showing 1 changed file with 78 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,14 @@ def materialize(_):
yield AssetMaterialization(
asset_key=other_asset_key, metadata={"count": 1}, partition="z"
)

yield AssetMaterialization(
asset_key=asset_key, metadata={"count": 1}, partition="dupe_materialization"
)
yield AssetMaterialization(
asset_key=asset_key, metadata={"count": 2}, partition="dupe_materialization"
)

yield Output(1)

def _ops():
Expand All @@ -1298,10 +1306,10 @@ def _get_counts(result):

# results come in descending order, by default
result = storage.fetch_materializations(asset_key, limit=100)
assert _get_counts(result) == [5, 4, 3, 2, 1]
assert _get_counts(result) == [2, 1, 5, 4, 3, 2, 1]

result = storage.fetch_materializations(asset_key, limit=3)
assert _get_counts(result) == [5, 4, 3]
assert _get_counts(result) == [2, 1, 5]

# results come in ascending order, limited
result = storage.fetch_materializations(asset_key, limit=3, ascending=True)
Expand All @@ -1323,7 +1331,7 @@ def _get_counts(result):
),
limit=100,
)
assert _get_counts(result) == [5, 4, 3, 2]
assert _get_counts(result) == [2, 1, 5, 4, 3, 2]

# filter by before storage id
result = storage.fetch_materializations(
Expand Down Expand Up @@ -1387,6 +1395,34 @@ def _get_counts(result):
)
assert _get_counts(result) == [5, 3, 1]

# fetch most recent event for a given partition
result = storage.fetch_materializations(
AssetRecordsFilter(
asset_key=asset_key,
asset_partitions=["dupe_materialization"],
),
limit=1,
)
assert _get_counts(result) == [2]

result = storage.fetch_materializations(
AssetRecordsFilter(
asset_key=asset_key,
asset_partitions=["5"],
),
limit=1,
)
assert _get_counts(result) == [5]

result = storage.fetch_materializations(
AssetRecordsFilter(
asset_key=asset_key,
asset_partitions=["non_existant"],
),
limit=1,
)
assert _get_counts(result) == []

def test_asset_observation_fetch(self, storage, instance):
asset_key = AssetKey(["path", "to", "asset_one"])

Expand All @@ -1399,6 +1435,14 @@ def observe(_):
yield AssetObservation(asset_key=asset_key, metadata={"count": 3}, partition="3")
yield AssetObservation(asset_key=asset_key, metadata={"count": 4}, partition="4")
yield AssetObservation(asset_key=asset_key, metadata={"count": 5}, partition="5")

yield AssetObservation(
asset_key=asset_key, metadata={"count": 1}, partition="dupe_observation"
)
yield AssetObservation(
asset_key=asset_key, metadata={"count": 2}, partition="dupe_observation"
)

yield Output(1)

def _ops():
Expand All @@ -1416,10 +1460,10 @@ def _get_counts(result):

# results come in descending order, by default
result = storage.fetch_observations(asset_key, limit=100)
assert _get_counts(result) == [5, 4, 3, 2, 1]
assert _get_counts(result) == [2, 1, 5, 4, 3, 2, 1]

result = storage.fetch_observations(asset_key, limit=3)
assert _get_counts(result) == [5, 4, 3]
assert _get_counts(result) == [2, 1, 5]

# results come in ascending order, limited
result = storage.fetch_observations(asset_key, limit=3, ascending=True)
Expand All @@ -1438,7 +1482,7 @@ def _get_counts(result):
),
limit=100,
)
assert _get_counts(result) == [5, 4, 3, 2]
assert _get_counts(result) == [2, 1, 5, 4, 3, 2]

# filter by before storage id
result = storage.fetch_observations(
Expand Down Expand Up @@ -1502,6 +1546,34 @@ def _get_counts(result):
)
assert _get_counts(result) == [5, 3, 1]

# fetch most recent event for a given partition
result = storage.fetch_observations(
AssetRecordsFilter(
asset_key=asset_key,
asset_partitions=["dupe_observation"],
),
limit=1,
)
assert _get_counts(result) == [2]

result = storage.fetch_observations(
AssetRecordsFilter(
asset_key=asset_key,
asset_partitions=["5"],
),
limit=1,
)
assert _get_counts(result) == [5]

result = storage.fetch_observations(
AssetRecordsFilter(
asset_key=asset_key,
asset_partitions=["non_existant"],
),
limit=1,
)
assert _get_counts(result) == []

def test_asset_materialization_null_key_fails(self):
with pytest.raises(check.CheckError):
AssetMaterialization(asset_key=None)
Expand Down

0 comments on commit d03552d

Please sign in to comment.