Skip to content

Commit

Permalink
[runless] report_asset_materialization endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld authored and Ramshackle-Jamathon committed Oct 3, 2023
1 parent 56994a4 commit 22219ad
Show file tree
Hide file tree
Showing 3 changed files with 302 additions and 0 deletions.
125 changes: 125 additions & 0 deletions python_modules/dagster-webserver/dagster_webserver/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
from dagster import __version__ as dagster_version
from dagster._annotations import deprecated
from dagster._core.debug import DebugRunPayload
from dagster._core.definitions.data_version import (
DATA_VERSION_IS_USER_PROVIDED_TAG,
DATA_VERSION_TAG,
)
from dagster._core.definitions.events import AssetKey, AssetMaterialization
from dagster._core.storage.cloud_storage_compute_log_manager import CloudStorageComputeLogManager
from dagster._core.storage.compute_log_manager import ComputeIOType
from dagster._core.storage.local_compute_log_manager import LocalComputeLogManager
Expand Down Expand Up @@ -195,6 +200,107 @@ async def download_captured_logs_endpoint(self, request: Request):
filebase = "__".join(log_key)
return FileResponse(location, filename=f"{filebase}.{file_extension}")

async def report_asset_materialization_endpoint(self, request: Request) -> Response:
# Record a runless asset materialization event.
# The asset key is passed as url path with / delimiting parts or as a query param.
# Properties can be passed as json post body or query params, with that order of precedence.

context = self.make_request_context(request)

body_content_type = request.headers.get("content-type")
if body_content_type is None:
json_body = {}
elif body_content_type == "application/json":
json_body = await request.json()
else:
return JSONResponse(
{
"error": (
f"Unhandled content type {body_content_type}, expect no body or"
" application/json"
),
},
status_code=400,
)

asset_key = None
if request.path_params.get(ReportAssetMatParam.asset_key):
# use from_user_string to treat / as multipart key separator
asset_key = AssetKey.from_user_string(request.path_params["asset_key"])
elif ReportAssetMatParam.asset_key in json_body:
asset_key = AssetKey(json_body[ReportAssetMatParam.asset_key])
elif ReportAssetMatParam.asset_key in request.query_params:
asset_key = AssetKey.from_db_string(request.query_params["asset_key"])

if asset_key is None:
return JSONResponse(
{
"error": (
"Empty asset key, must provide asset key as url path after"
" /report_asset_materialization/ or query param asset_key."
),
},
status_code=400,
)

tags = None
if ReportAssetMatParam.data_version in json_body:
tags = {
DATA_VERSION_TAG: json_body[ReportAssetMatParam.data_version],
DATA_VERSION_IS_USER_PROVIDED_TAG: "true",
}
elif ReportAssetMatParam.data_version in request.query_params:
tags = {
DATA_VERSION_TAG: request.query_params[ReportAssetMatParam.data_version],
DATA_VERSION_IS_USER_PROVIDED_TAG: "true",
}

partition = None
if ReportAssetMatParam.partition in json_body:
partition = json_body[ReportAssetMatParam.partition]
elif ReportAssetMatParam.partition in request.query_params:
partition = request.query_params[ReportAssetMatParam.partition]

description = None
if ReportAssetMatParam.description in json_body:
description = json_body[ReportAssetMatParam.description]
elif ReportAssetMatParam.description in request.query_params:
description = request.query_params[ReportAssetMatParam.description]

metadata = None
if ReportAssetMatParam.metadata in json_body:
metadata = json_body["metadata"]
elif ReportAssetMatParam.metadata in request.query_params:
try:
metadata = json.loads(request.query_params[ReportAssetMatParam.metadata])
except Exception as exc:
return JSONResponse(
{
"error": f"Error parsing metadata json: {exc}",
},
status_code=400,
)

try:
mat = AssetMaterialization(
asset_key=asset_key,
partition=partition,
metadata=metadata,
description=description,
tags=tags,
)
except Exception as exc:
return JSONResponse(
{
"error": f"Error constructing AssetMaterialization: {exc}",
},
status_code=400,
)

context.instance.report_runless_asset_event(mat)

return JSONResponse({})

def index_html_endpoint(self, request: Request):
"""Serves root html."""
index_path = self.relative_path("webapp/build/index.html")
Expand Down Expand Up @@ -304,6 +410,11 @@ def build_routes(self):
"/download_debug/{run_id:str}",
self.download_debug_file_endpoint,
),
Route(
"/report_asset_materialization/{asset_key:path}",
self.report_asset_materialization_endpoint,
methods=["POST"],
),
Route("/{path:path}", self.index_html_endpoint),
Route("/", self.index_html_endpoint),
]
Expand Down Expand Up @@ -345,3 +456,17 @@ def send_wrapper(message: Message):
return send(message)

await self.app(scope, receive, send_wrapper)


class ReportAssetMatParam:
"""Class to collect all supported args by report_asset_materialization endpoint
to ensure consistency with related APIs.
note: Enum not used to avoid value type problems X(str, Enum) doesn't work as partition conflicts with keyword
"""

asset_key = "asset_key"
data_version = "data_version"
metadata = "metadata"
description = "description"
partition = "partition"
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
import gc
import inspect

import objgraph
import pytest
from dagster import (
DagsterInstance,
__version__ as dagster_version,
job,
op,
)
from dagster._core.definitions.data_version import (
DATA_VERSION_IS_USER_PROVIDED_TAG,
DATA_VERSION_TAG,
)
from dagster._core.definitions.events import AssetKey, AssetMaterialization
from dagster._core.events import DagsterEventType
from dagster._serdes import unpack_value
from dagster._seven import json
from dagster._utils.error import SerializableErrorInfo
from dagster_ext import ExtContext
from dagster_graphql.version import __version__ as dagster_graphql_version
from dagster_webserver.graphql import GraphQLWS
from dagster_webserver.version import __version__ as dagster_webserver_version
from dagster_webserver.webserver import ReportAssetMatParam
from starlette.testclient import TestClient

EVENT_LOG_SUBSCRIPTION = """
Expand Down Expand Up @@ -278,3 +287,170 @@ def test_download_compute(instance, test_client: TestClient):

response = test_client.get(f"/download/{run_id}/jonx/stdout")
assert response.status_code == 404


def test_runless_events(instance: DagsterInstance, test_client: TestClient):
# base case
my_asset_key = "my_asset"
response = test_client.post(f"/report_asset_materialization/{my_asset_key}")
assert response.status_code == 200
evt = instance.get_latest_materialization_event(AssetKey(my_asset_key))
assert evt

# empty asset key path
response = test_client.post("/report_asset_materialization/")
assert response.status_code == 400

# multipart key
long_key = AssetKey(["foo", "bar", "baz"])
response = test_client.post(
# url / delimiter for multipart keys
"/report_asset_materialization/foo/bar/baz", # long_key.to_user_string()
)
assert response.status_code == 200
evt = instance.get_latest_materialization_event(long_key)
assert evt

# slash in key (have to use query param)
slash_key = AssetKey("slash/key")
response = test_client.post(
# have to urlencode / that are part of they key
"/report_asset_materialization/",
params={"asset_key": '["slash/key"]'}, # slash_key.to_string(),
)
assert response.status_code == 200
evt = instance.get_latest_materialization_event(slash_key)
assert evt

# multi part with slashes (have to use query param)
nasty_key = AssetKey(["a/b", "c/d"])
response = test_client.post(
# have to urlencode / that are part of they key
"/report_asset_materialization/",
json={
"asset_key": ["a/b", "c/d"], # same args passed to AssetKey
},
)
assert response.status_code == 200
evt = instance.get_latest_materialization_event(nasty_key)
assert evt

meta = {"my_metadata": "value"}
mat = AssetMaterialization(
asset_key=my_asset_key,
partition="2021-09-23",
description="cutest",
metadata=meta,
tags={
DATA_VERSION_TAG: "new",
DATA_VERSION_IS_USER_PROVIDED_TAG: "true",
},
)

# kitchen sink json body
response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
json={
"description": mat.description,
"partition": mat.partition,
"metadata": meta, # handled separately to avoid MetadataValue ish
"data_version": "new",
},
)
assert response.status_code == 200
evt = instance.get_latest_materialization_event(AssetKey(my_asset_key))
assert evt
assert evt.asset_materialization
assert evt.asset_materialization == mat

# kitchen sink query params
response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
params={
"description": mat.description,
"partition": mat.partition,
"metadata": json.dumps(meta),
"data_version": "new",
},
)
assert response.status_code == 200, response.json()
evt = instance.get_latest_materialization_event(AssetKey(my_asset_key))
assert evt
assert evt.asset_materialization
assert evt.asset_materialization == mat

# bad metadata
response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
params={
"metadata": meta, # not json encoded
}, # type: ignore
)
assert response.status_code == 400
assert "Error parsing metadata json" in response.json()["error"]

response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
json={
"metadata": "im_just_a_string",
},
)
assert response.status_code == 400
assert (
'Error constructing AssetMaterialization: Param "metadata" is not'
in response.json()["error"]
)


def test_report_asset_materialization_apis_consistent(
instance: DagsterInstance, test_client: TestClient
):
# ensure the ext report_asset_materialization and the API endpoint have the same capabilities
sample_payload = {
"asset_key": "sample_key",
"metadata": {"meta": "data"},
"data_version": "so_new",
"partition": "2023-09-23",
"description": "boo",
}

# sample has entry for all supported params (banking on usage of enum)
assert set(sample_payload.keys()) == set(
{v for k, v in vars(ReportAssetMatParam).items() if not k.startswith("__")}
)

response = test_client.post("/report_asset_materialization/", json=sample_payload)
assert response.status_code == 200
evt = instance.get_latest_materialization_event(AssetKey(sample_payload["asset_key"]))
assert evt
mat = evt.asset_materialization
assert mat

for k, v in sample_payload.items():
if k == "asset_key":
assert mat.asset_key == AssetKey(v)
elif k == "metadata":
assert mat.metadata.keys() == v.keys()
elif k == "data_version":
tags = mat.tags
assert tags
assert tags[DATA_VERSION_TAG] == v
assert tags[DATA_VERSION_IS_USER_PROVIDED_TAG]
elif k == "partition":
assert mat.partition == v
elif k == "description":
assert mat.description == v
else:
assert (
False
), "need to add validation that sample payload content was written successfully"

# all ext report_asset_materialization kwargs should be in sample payload
sig = inspect.signature(ExtContext.report_asset_materialization)
skip_set = {"self"}
params = [p for p in sig.parameters if p not in skip_set]

# things that are missing on ExtContext.report_asset_materialization
KNOWN_DIFF = {"partition", "description"}

assert set(sample_payload.keys()).difference(set(params)) == KNOWN_DIFF
1 change: 1 addition & 0 deletions python_modules/dagster-webserver/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ deps =
-e ../dagster-pipes
-e ../dagster-graphql
-e .[notebook,test]
-e ../dagster-ext-process

allowlist_externals =
/bin/bash
Expand Down

0 comments on commit 22219ad

Please sign in to comment.