Skip to content

Commit

Permalink
[runless] report_asset_materialization endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Sep 18, 2023
1 parent eb86610 commit 46c8ce8
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 0 deletions.
87 changes: 87 additions & 0 deletions python_modules/dagster-webserver/dagster_webserver/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dagster import __version__ as dagster_version
from dagster._annotations import deprecated
from dagster._core.debug import DebugRunPayload
from dagster._core.definitions.events import AssetKey, AssetMaterialization
from dagster._core.storage.cloud_storage_compute_log_manager import CloudStorageComputeLogManager
from dagster._core.storage.compute_log_manager import ComputeIOType
from dagster._core.storage.local_compute_log_manager import LocalComputeLogManager
Expand Down Expand Up @@ -197,6 +198,87 @@ async def download_captured_logs_endpoint(self, request: Request):
filebase = "__".join(log_key)
return FileResponse(location, filename=f"{filebase}.{file_extension}")

async def report_asset_materialization_endpoint(self, request: Request) -> Response:
# Record a runless asset materialization event. Properties can be passed as
# json post body or query params, with that order of precedence.

context = self.make_request_context(request)
key_str = request.path_params["asset_key"]
if not key_str:
return JSONResponse(
{
"error": (
"Empty asset key, must provide asset key as url path after"
" /report_asset_materialization/"
),
},
status_code=400,
)

body_content_type = request.headers.get("content-type")

if body_content_type is None:
json_body = {}
elif body_content_type == "application/json":
json_body = await request.json()
else:
return JSONResponse(
{
"error": (
f"Unhandled content type {body_content_type}, expect no body or"
" application/json"
),
},
status_code=400,
)

asset_key = AssetKey(key_str)

description = None
if "description" in json_body:
description = json_body["description"]
elif "description" in request.query_params:
description = request.query_params["description"]

partition = None
if "partition" in json_body:
partition = json_body["partition"]
elif "partition" in request.query_params:
partition = request.query_params["partition"]

metadata = None
if "metadata" in json_body:
metadata = json_body["metadata"]
elif "metadata" in request.query_params:
try:
metadata = json.loads(request.query_params["metadata"])
except Exception as exc:
return JSONResponse(
{
"error": f"Error parsing metadata json: {exc}",
},
status_code=400,
)

try:
mat = AssetMaterialization(
asset_key=asset_key,
description=description,
partition=partition,
metadata=metadata,
)
except Exception as exc:
return JSONResponse(
{
"error": f"Error constructing AssetMaterialization: {exc}",
},
status_code=400,
)

context.instance.report_runless_asset_event(mat)

return JSONResponse({})

def index_html_endpoint(self, request: Request):
"""Serves root html."""
index_path = self.relative_path("webapp/build/index.html")
Expand Down Expand Up @@ -323,6 +405,11 @@ def build_routes(self):
"/download_debug/{run_id:str}",
self.download_debug_file_endpoint,
),
Route(
"/report_asset_materialization/{asset_key:path}",
self.report_asset_materialization_endpoint,
methods=["POST"],
),
Route("/{path:path}", self.index_html_endpoint),
Route("/", self.index_html_endpoint),
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import objgraph
import pytest
from dagster import (
DagsterInstance,
__version__ as dagster_version,
job,
op,
)
from dagster._core.definitions.events import AssetKey, AssetMaterialization
from dagster._core.events import DagsterEventType
from dagster._serdes import unpack_value
from dagster._seven import json
Expand Down Expand Up @@ -278,3 +280,76 @@ def test_download_compute(instance, test_client: TestClient):

response = test_client.get(f"/download/{run_id}/jonx/stdout")
assert response.status_code == 404


def test_runless_events(instance: DagsterInstance, test_client: TestClient):
# base case
my_asset_key = "my_asset"
response = test_client.post(f"/report_asset_materialization/{my_asset_key}")
assert response.status_code == 200
evt = instance.get_latest_materialization_event(AssetKey(my_asset_key))
assert evt

# empty asset key path
response = test_client.post("/report_asset_materialization/")
assert response.status_code == 400

meta = {"my_metadata": "value"}
mat = AssetMaterialization(
asset_key=my_asset_key,
partition="2021-09-23",
description="cutest",
metadata=meta,
)

# kitchen sink json body
response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
json={
"description": mat.description,
"partition": mat.partition,
"metadata": meta, # handled separately to avoid MetadataValue ish
},
)
assert response.status_code == 200
evt = instance.get_latest_materialization_event(AssetKey(my_asset_key))
assert evt
assert evt.asset_materialization
assert evt.asset_materialization == mat

# kitchen sink query params
response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
params={
"description": mat.description,
"partition": mat.partition,
"metadata": json.dumps(meta),
},
)
assert response.status_code == 200, response.json()
evt = instance.get_latest_materialization_event(AssetKey(my_asset_key))
assert evt
assert evt.asset_materialization
assert evt.asset_materialization == mat

# bad metadata
response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
params={
"metadata": meta, # not json encoded
},
)
assert response.status_code == 400
assert "Error parsing metadata json" in response.json()["error"]

response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
json={
"metadata": "im_just_a_string",
},
)
assert response.status_code == 400
assert (
'Error constructing AssetMaterialization: Param "metadata" is not'
in response.json()["error"]
)

0 comments on commit 46c8ce8

Please sign in to comment.