Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[runless] report_asset_materialization endpoint #16602

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions python_modules/dagster-webserver/dagster_webserver/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
from dagster import __version__ as dagster_version
from dagster._annotations import deprecated
from dagster._core.debug import DebugRunPayload
from dagster._core.definitions.data_version import (
DATA_VERSION_IS_USER_PROVIDED_TAG,
DATA_VERSION_TAG,
)
from dagster._core.definitions.events import AssetKey, AssetMaterialization
from dagster._core.storage.cloud_storage_compute_log_manager import CloudStorageComputeLogManager
from dagster._core.storage.compute_log_manager import ComputeIOType
from dagster._core.storage.local_compute_log_manager import LocalComputeLogManager
Expand Down Expand Up @@ -195,6 +200,107 @@ async def download_captured_logs_endpoint(self, request: Request):
filebase = "__".join(log_key)
return FileResponse(location, filename=f"{filebase}.{file_extension}")

async def report_asset_materialization_endpoint(self, request: Request) -> JSONResponse:
# Record a runless asset materialization event.
# The asset key is passed as url path with / delimiting parts or as a query param.
# Properties can be passed as json post body or query params, with that order of precedence.

context = self.make_request_context(request)

body_content_type = request.headers.get("content-type")
if body_content_type is None:
json_body = {}
elif body_content_type == "application/json":
json_body = await request.json()
else:
return JSONResponse(
{
"error": (
f"Unhandled content type {body_content_type}, expect no body or"
" application/json"
),
},
status_code=400,
)

asset_key = None
if request.path_params.get(ReportAssetMatParam.asset_key):
# use from_user_string to treat / as multipart key separator
asset_key = AssetKey.from_user_string(request.path_params["asset_key"])
elif ReportAssetMatParam.asset_key in json_body:
asset_key = AssetKey(json_body[ReportAssetMatParam.asset_key])
elif ReportAssetMatParam.asset_key in request.query_params:
asset_key = AssetKey.from_db_string(request.query_params["asset_key"])

if asset_key is None:
return JSONResponse(
{
"error": (
"Empty asset key, must provide asset key as url path after"
" /report_asset_materialization/ or query param asset_key."
),
},
status_code=400,
)

tags = None
if ReportAssetMatParam.data_version in json_body:
tags = {
DATA_VERSION_TAG: json_body[ReportAssetMatParam.data_version],
DATA_VERSION_IS_USER_PROVIDED_TAG: "true",
}
elif ReportAssetMatParam.data_version in request.query_params:
tags = {
DATA_VERSION_TAG: request.query_params[ReportAssetMatParam.data_version],
DATA_VERSION_IS_USER_PROVIDED_TAG: "true",
}
Comment on lines +246 to +256
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smackesey to confirm this data_version handling


partition = None
if ReportAssetMatParam.partition in json_body:
partition = json_body[ReportAssetMatParam.partition]
elif ReportAssetMatParam.partition in request.query_params:
partition = request.query_params[ReportAssetMatParam.partition]

description = None
if ReportAssetMatParam.description in json_body:
description = json_body[ReportAssetMatParam.description]
elif ReportAssetMatParam.description in request.query_params:
description = request.query_params[ReportAssetMatParam.description]

metadata = None
if ReportAssetMatParam.metadata in json_body:
metadata = json_body["metadata"]
elif ReportAssetMatParam.metadata in request.query_params:
try:
metadata = json.loads(request.query_params[ReportAssetMatParam.metadata])
except Exception as exc:
return JSONResponse(
{
"error": f"Error parsing metadata json: {exc}",
},
status_code=400,
)

try:
mat = AssetMaterialization(
asset_key=asset_key,
partition=partition,
metadata=metadata,
description=description,
tags=tags,
)
except Exception as exc:
return JSONResponse(
{
"error": f"Error constructing AssetMaterialization: {exc}",
},
status_code=400,
)

context.instance.report_runless_asset_event(mat)

return JSONResponse({})

def index_html_endpoint(self, request: Request):
"""Serves root html."""
index_path = self.relative_path("webapp/build/index.html")
Expand Down Expand Up @@ -304,6 +410,11 @@ def build_routes(self):
"/download_debug/{run_id:str}",
self.download_debug_file_endpoint,
),
Route(
"/report_asset_materialization/{asset_key:path}",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

differs from report_runless_asset_event naming since we are encoding the type as the part of the url path to allow for ease of use

self.report_asset_materialization_endpoint,
methods=["POST"],
),
Route("/{path:path}", self.index_html_endpoint),
Route("/", self.index_html_endpoint),
]
Expand Down Expand Up @@ -345,3 +456,17 @@ def send_wrapper(message: Message):
return send(message)

await self.app(scope, receive, send_wrapper)


class ReportAssetMatParam:
"""Class to collect all supported args by report_asset_materialization endpoint
to ensure consistency with related APIs.

note: Enum not used to avoid value type problems X(str, Enum) doesn't work as partition conflicts with keyword
"""

asset_key = "asset_key"
data_version = "data_version"
metadata = "metadata"
description = "description"
partition = "partition"
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
import gc
import inspect

import objgraph
import pytest
from dagster import (
DagsterInstance,
__version__ as dagster_version,
job,
op,
)
from dagster._core.definitions.data_version import (
DATA_VERSION_IS_USER_PROVIDED_TAG,
DATA_VERSION_TAG,
)
from dagster._core.definitions.events import AssetKey, AssetMaterialization
from dagster._core.events import DagsterEventType
from dagster._serdes import unpack_value
from dagster._seven import json
from dagster._utils.error import SerializableErrorInfo
from dagster_graphql.version import __version__ as dagster_graphql_version
from dagster_pipes import PipesContext
from dagster_webserver.graphql import GraphQLWS
from dagster_webserver.version import __version__ as dagster_webserver_version
from dagster_webserver.webserver import ReportAssetMatParam
from starlette.testclient import TestClient

EVENT_LOG_SUBSCRIPTION = """
Expand Down Expand Up @@ -278,3 +287,169 @@ def test_download_compute(instance, test_client: TestClient):

response = test_client.get(f"/download/{run_id}/jonx/stdout")
assert response.status_code == 404


def test_runless_events(instance: DagsterInstance, test_client: TestClient):
# base case
my_asset_key = "my_asset"
response = test_client.post(f"/report_asset_materialization/{my_asset_key}")
assert response.status_code == 200
evt = instance.get_latest_materialization_event(AssetKey(my_asset_key))
assert evt

# empty asset key path
response = test_client.post("/report_asset_materialization/")
assert response.status_code == 400

# multipart key
long_key = AssetKey(["foo", "bar", "baz"])
response = test_client.post(
# url / delimiter for multipart keys
"/report_asset_materialization/foo/bar/baz", # long_key.to_user_string()
)
assert response.status_code == 200
evt = instance.get_latest_materialization_event(long_key)
assert evt

# slash in key (have to use query param)
slash_key = AssetKey("slash/key")
response = test_client.post(
# have to urlencode / that are part of they key
"/report_asset_materialization/",
params={"asset_key": '["slash/key"]'}, # slash_key.to_string(),
)
assert response.status_code == 200
evt = instance.get_latest_materialization_event(slash_key)
assert evt

# multi part with slashes (have to use query param)
nasty_key = AssetKey(["a/b", "c/d"])
response = test_client.post(
# have to urlencode / that are part of they key
"/report_asset_materialization/",
json={
"asset_key": ["a/b", "c/d"], # same args passed to AssetKey
},
)
assert response.status_code == 200
evt = instance.get_latest_materialization_event(nasty_key)
assert evt

meta = {"my_metadata": "value"}
mat = AssetMaterialization(
asset_key=my_asset_key,
partition="2021-09-23",
description="cutest",
metadata=meta,
tags={
DATA_VERSION_TAG: "new",
DATA_VERSION_IS_USER_PROVIDED_TAG: "true",
},
)

# kitchen sink json body
response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
json={
"description": mat.description,
"partition": mat.partition,
"metadata": meta, # handled separately to avoid MetadataValue ish
"data_version": "new",
},
)
assert response.status_code == 200
evt = instance.get_latest_materialization_event(AssetKey(my_asset_key))
assert evt
assert evt.asset_materialization
assert evt.asset_materialization == mat

# kitchen sink query params
response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
params={
"description": mat.description,
"partition": mat.partition,
"metadata": json.dumps(meta),
"data_version": "new",
},
)
assert response.status_code == 200, response.json()
evt = instance.get_latest_materialization_event(AssetKey(my_asset_key))
assert evt
assert evt.asset_materialization
assert evt.asset_materialization == mat

# bad metadata
response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
params={
"metadata": meta, # not json encoded
}, # type: ignore
)
assert response.status_code == 400
assert "Error parsing metadata json" in response.json()["error"]

response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
json={
"metadata": "im_just_a_string",
},
)
assert response.status_code == 400
assert (
'Error constructing AssetMaterialization: Param "metadata" is not'
in response.json()["error"]
)


def test_report_asset_materialization_apis_consistent(
instance: DagsterInstance, test_client: TestClient
):
# ensure the ext report_asset_materialization and the API endpoint have the same capabilities
sample_payload = {
"asset_key": "sample_key",
"metadata": {"meta": "data"},
"data_version": "so_new",
"partition": "2023-09-23",
"description": "boo",
}

# sample has entry for all supported params (banking on usage of enum)
assert set(sample_payload.keys()) == set(
{v for k, v in vars(ReportAssetMatParam).items() if not k.startswith("__")}
)

response = test_client.post("/report_asset_materialization/", json=sample_payload)
assert response.status_code == 200
evt = instance.get_latest_materialization_event(AssetKey(sample_payload["asset_key"]))
assert evt
mat = evt.asset_materialization
assert mat

for k, v in sample_payload.items():
if k == "asset_key":
assert mat.asset_key == AssetKey(v)
elif k == "metadata":
assert mat.metadata.keys() == v.keys()
elif k == "data_version":
tags = mat.tags
assert tags
assert tags[DATA_VERSION_TAG] == v
assert tags[DATA_VERSION_IS_USER_PROVIDED_TAG]
elif k == "partition":
assert mat.partition == v
elif k == "description":
assert mat.description == v
else:
assert (
False
), "need to add validation that sample payload content was written successfully"

# all ext report_asset_materialization kwargs should be in sample payload
sig = inspect.signature(PipesContext.report_asset_materialization)
skip_set = {"self"}
params = [p for p in sig.parameters if p not in skip_set]

KNOWN_DIFF = {"partition", "description"}

assert set(sample_payload.keys()).difference(set(params)) == KNOWN_DIFF