Skip to content

Commit

Permalink
[runless] report_asset_materialization endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Sep 20, 2023
1 parent 25f11ba commit 5473048
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 0 deletions.
94 changes: 94 additions & 0 deletions python_modules/dagster-webserver/dagster_webserver/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dagster import __version__ as dagster_version
from dagster._annotations import deprecated
from dagster._core.debug import DebugRunPayload
from dagster._core.definitions.events import AssetKey, AssetMaterialization
from dagster._core.storage.cloud_storage_compute_log_manager import CloudStorageComputeLogManager
from dagster._core.storage.compute_log_manager import ComputeIOType
from dagster._core.storage.local_compute_log_manager import LocalComputeLogManager
Expand Down Expand Up @@ -197,6 +198,94 @@ async def download_captured_logs_endpoint(self, request: Request):
filebase = "__".join(log_key)
return FileResponse(location, filename=f"{filebase}.{file_extension}")

async def report_asset_materialization_endpoint(self, request: Request) -> Response:
# Record a runless asset materialization event.
# The asset key is passed as url path with / delimiting parts or as a query param.
# Properties can be passed as json post body or query params, with that order of precedence.

context = self.make_request_context(request)

body_content_type = request.headers.get("content-type")
if body_content_type is None:
json_body = {}
elif body_content_type == "application/json":
json_body = await request.json()
else:
return JSONResponse(
{
"error": (
f"Unhandled content type {body_content_type}, expect no body or"
" application/json"
),
},
status_code=400,
)

asset_key = None
if request.path_params.get("asset_key"):
# use from_user_string to treat / as multipart key separator
asset_key = AssetKey.from_user_string(request.path_params["asset_key"])
elif "asset_key" in json_body:
asset_key = AssetKey(json_body["asset_key"])
elif "asset_key" in request.query_params:
asset_key = AssetKey.from_db_string(request.query_params["asset_key"])

if asset_key is None:
return JSONResponse(
{
"error": (
"Empty asset key, must provide asset key as url path after"
" /report_asset_materialization/ or query param asset_key."
),
},
status_code=400,
)

description = None
if "description" in json_body:
description = json_body["description"]
elif "description" in request.query_params:
description = request.query_params["description"]

partition = None
if "partition" in json_body:
partition = json_body["partition"]
elif "partition" in request.query_params:
partition = request.query_params["partition"]

metadata = None
if "metadata" in json_body:
metadata = json_body["metadata"]
elif "metadata" in request.query_params:
try:
metadata = json.loads(request.query_params["metadata"])
except Exception as exc:
return JSONResponse(
{
"error": f"Error parsing metadata json: {exc}",
},
status_code=400,
)

try:
mat = AssetMaterialization(
asset_key=asset_key,
description=description,
partition=partition,
metadata=metadata,
)
except Exception as exc:
return JSONResponse(
{
"error": f"Error constructing AssetMaterialization: {exc}",
},
status_code=400,
)

context.instance.report_runless_asset_event(mat)

return JSONResponse({})

def index_html_endpoint(self, request: Request):
"""Serves root html."""
index_path = self.relative_path("webapp/build/index.html")
Expand Down Expand Up @@ -323,6 +412,11 @@ def build_routes(self):
"/download_debug/{run_id:str}",
self.download_debug_file_endpoint,
),
Route(
"/report_asset_materialization/{asset_key:path}",
self.report_asset_materialization_endpoint,
methods=["POST"],
),
Route("/{path:path}", self.index_html_endpoint),
Route("/", self.index_html_endpoint),
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import objgraph
import pytest
from dagster import (
DagsterInstance,
__version__ as dagster_version,
job,
op,
)
from dagster._core.definitions.events import AssetKey, AssetMaterialization
from dagster._core.events import DagsterEventType
from dagster._serdes import unpack_value
from dagster._seven import json
Expand Down Expand Up @@ -278,3 +280,110 @@ def test_download_compute(instance, test_client: TestClient):

response = test_client.get(f"/download/{run_id}/jonx/stdout")
assert response.status_code == 404


def test_runless_events(instance: DagsterInstance, test_client: TestClient):
# base case
my_asset_key = "my_asset"
response = test_client.post(f"/report_asset_materialization/{my_asset_key}")
assert response.status_code == 200
evt = instance.get_latest_materialization_event(AssetKey(my_asset_key))
assert evt

# empty asset key path
response = test_client.post("/report_asset_materialization/")
assert response.status_code == 400

# multipart key
long_key = AssetKey(["foo", "bar", "baz"])
response = test_client.post(
# url / delimiter for multipart keys
"/report_asset_materialization/foo/bar/baz", # long_key.to_user_string()
)
assert response.status_code == 200
evt = instance.get_latest_materialization_event(long_key)
assert evt

# slash in key (have to use query param)
slash_key = AssetKey("slash/key")
response = test_client.post(
# have to urlencode / that are part of they key
"/report_asset_materialization/",
params={"asset_key": '["slash/key"]'}, # slash_key.to_string(),
)
assert response.status_code == 200
evt = instance.get_latest_materialization_event(slash_key)
assert evt

# multi part with slashes (have to use query param)
nasty_key = AssetKey(["a/b", "c/d"])
response = test_client.post(
# have to urlencode / that are part of they key
"/report_asset_materialization/",
json={
"asset_key": ["a/b", "c/d"], # same args passed to AssetKey
},
)
assert response.status_code == 200
evt = instance.get_latest_materialization_event(nasty_key)
assert evt

meta = {"my_metadata": "value"}
mat = AssetMaterialization(
asset_key=my_asset_key,
partition="2021-09-23",
description="cutest",
metadata=meta,
)

# kitchen sink json body
response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
json={
"description": mat.description,
"partition": mat.partition,
"metadata": meta, # handled separately to avoid MetadataValue ish
},
)
assert response.status_code == 200
evt = instance.get_latest_materialization_event(AssetKey(my_asset_key))
assert evt
assert evt.asset_materialization
assert evt.asset_materialization == mat

# kitchen sink query params
response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
params={
"description": mat.description,
"partition": mat.partition,
"metadata": json.dumps(meta),
},
)
assert response.status_code == 200, response.json()
evt = instance.get_latest_materialization_event(AssetKey(my_asset_key))
assert evt
assert evt.asset_materialization
assert evt.asset_materialization == mat

# bad metadata
response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
params={
"metadata": meta, # not json encoded
}, # type: ignore
)
assert response.status_code == 400
assert "Error parsing metadata json" in response.json()["error"]

response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
json={
"metadata": "im_just_a_string",
},
)
assert response.status_code == 400
assert (
'Error constructing AssetMaterialization: Param "metadata" is not'
in response.json()["error"]
)

0 comments on commit 5473048

Please sign in to comment.