From ce88e4acb970c3d2319acefd83267a34cad39bc7 Mon Sep 17 00:00:00 2001 From: alangenfeld Date: Mon, 18 Sep 2023 14:49:45 -0500 Subject: [PATCH] [runless] report_asset_materialization endpoint --- .../dagster_webserver/webserver.py | 94 +++++++++++++++ .../webserver/test_app.py | 109 ++++++++++++++++++ 2 files changed, 203 insertions(+) diff --git a/python_modules/dagster-webserver/dagster_webserver/webserver.py b/python_modules/dagster-webserver/dagster_webserver/webserver.py index a6e25ff911585..1826038a644eb 100644 --- a/python_modules/dagster-webserver/dagster_webserver/webserver.py +++ b/python_modules/dagster-webserver/dagster_webserver/webserver.py @@ -8,6 +8,7 @@ from dagster import __version__ as dagster_version from dagster._annotations import deprecated from dagster._core.debug import DebugRunPayload +from dagster._core.definitions.events import AssetKey, AssetMaterialization from dagster._core.storage.cloud_storage_compute_log_manager import CloudStorageComputeLogManager from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._core.storage.local_compute_log_manager import LocalComputeLogManager @@ -197,6 +198,94 @@ async def download_captured_logs_endpoint(self, request: Request): filebase = "__".join(log_key) return FileResponse(location, filename=f"{filebase}.{file_extension}") + async def report_asset_materialization_endpoint(self, request: Request) -> Response: + # Record a runless asset materialization event. + # The asset key is passed as url path with / delimiting parts or as a query param. + # Properties can be passed as json post body or query params, with that order of precedence. + + context = self.make_request_context(request) + + body_content_type = request.headers.get("content-type") + if body_content_type is None: + json_body = {} + elif body_content_type == "application/json": + json_body = await request.json() + else: + return JSONResponse( + { + "error": ( + f"Unhandled content type {body_content_type}, expect no body or" + " application/json" + ), + }, + status_code=400, + ) + + asset_key = None + if request.path_params.get("asset_key"): + # use from_user_string to treat / as multipart key separator + asset_key = AssetKey.from_user_string(request.path_params["asset_key"]) + elif "asset_key" in json_body: + asset_key = AssetKey(json_body["asset_key"]) + elif "asset_key" in request.query_params: + asset_key = AssetKey.from_db_string(request.query_params["asset_key"]) + + if asset_key is None: + return JSONResponse( + { + "error": ( + "Empty asset key, must provide asset key as url path after" + " /report_asset_materialization/ or query param asset_key." + ), + }, + status_code=400, + ) + + description = None + if "description" in json_body: + description = json_body["description"] + elif "description" in request.query_params: + description = request.query_params["description"] + + partition = None + if "partition" in json_body: + partition = json_body["partition"] + elif "partition" in request.query_params: + partition = request.query_params["partition"] + + metadata = None + if "metadata" in json_body: + metadata = json_body["metadata"] + elif "metadata" in request.query_params: + try: + metadata = json.loads(request.query_params["metadata"]) + except Exception as exc: + return JSONResponse( + { + "error": f"Error parsing metadata json: {exc}", + }, + status_code=400, + ) + + try: + mat = AssetMaterialization( + asset_key=asset_key, + description=description, + partition=partition, + metadata=metadata, + ) + except Exception as exc: + return JSONResponse( + { + "error": f"Error constructing AssetMaterialization: {exc}", + }, + status_code=400, + ) + + context.instance.report_runless_asset_event(mat) + + return JSONResponse({}) + def index_html_endpoint(self, request: Request): """Serves root html.""" index_path = self.relative_path("webapp/build/index.html") @@ -323,6 +412,11 @@ def build_routes(self): "/download_debug/{run_id:str}", self.download_debug_file_endpoint, ), + Route( + "/report_asset_materialization/{asset_key:path}", + self.report_asset_materialization_endpoint, + methods=["POST"], + ), Route("/{path:path}", self.index_html_endpoint), Route("/", self.index_html_endpoint), ] diff --git a/python_modules/dagster-webserver/dagster_webserver_tests/webserver/test_app.py b/python_modules/dagster-webserver/dagster_webserver_tests/webserver/test_app.py index 544dd7efa2b32..651042beaf364 100644 --- a/python_modules/dagster-webserver/dagster_webserver_tests/webserver/test_app.py +++ b/python_modules/dagster-webserver/dagster_webserver_tests/webserver/test_app.py @@ -3,10 +3,12 @@ import objgraph import pytest from dagster import ( + DagsterInstance, __version__ as dagster_version, job, op, ) +from dagster._core.definitions.events import AssetKey, AssetMaterialization from dagster._core.events import DagsterEventType from dagster._serdes import unpack_value from dagster._seven import json @@ -278,3 +280,110 @@ def test_download_compute(instance, test_client: TestClient): response = test_client.get(f"/download/{run_id}/jonx/stdout") assert response.status_code == 404 + + +def test_runless_events(instance: DagsterInstance, test_client: TestClient): + # base case + my_asset_key = "my_asset" + response = test_client.post(f"/report_asset_materialization/{my_asset_key}") + assert response.status_code == 200 + evt = instance.get_latest_materialization_event(AssetKey(my_asset_key)) + assert evt + + # empty asset key path + response = test_client.post("/report_asset_materialization/") + assert response.status_code == 400 + + # multipart key + long_key = AssetKey(["foo", "bar", "baz"]) + response = test_client.post( + # url / delimiter for multipart keys + "/report_asset_materialization/foo/bar/baz", # long_key.to_user_string() + ) + assert response.status_code == 200 + evt = instance.get_latest_materialization_event(long_key) + assert evt + + # slash in key (have to use query param) + slash_key = AssetKey("slash/key") + response = test_client.post( + # have to urlencode / that are part of they key + "/report_asset_materialization/", + params={"asset_key": '["slash/key"]'}, # slash_key.to_string(), + ) + assert response.status_code == 200 + evt = instance.get_latest_materialization_event(slash_key) + assert evt + + # multi part with slashes (have to use query param) + nasty_key = AssetKey(["a/b", "c/d"]) + response = test_client.post( + # have to urlencode / that are part of they key + "/report_asset_materialization/", + json={ + "asset_key": ["a/b", "c/d"], # same args passed to AssetKey + }, + ) + assert response.status_code == 200 + evt = instance.get_latest_materialization_event(nasty_key) + assert evt + + meta = {"my_metadata": "value"} + mat = AssetMaterialization( + asset_key=my_asset_key, + partition="2021-09-23", + description="cutest", + metadata=meta, + ) + + # kitchen sink json body + response = test_client.post( + f"/report_asset_materialization/{my_asset_key}", + json={ + "description": mat.description, + "partition": mat.partition, + "metadata": meta, # handled separately to avoid MetadataValue ish + }, + ) + assert response.status_code == 200 + evt = instance.get_latest_materialization_event(AssetKey(my_asset_key)) + assert evt + assert evt.asset_materialization + assert evt.asset_materialization == mat + + # kitchen sink query params + response = test_client.post( + f"/report_asset_materialization/{my_asset_key}", + params={ + "description": mat.description, + "partition": mat.partition, + "metadata": json.dumps(meta), + }, + ) + assert response.status_code == 200, response.json() + evt = instance.get_latest_materialization_event(AssetKey(my_asset_key)) + assert evt + assert evt.asset_materialization + assert evt.asset_materialization == mat + + # bad metadata + response = test_client.post( + f"/report_asset_materialization/{my_asset_key}", + params={ + "metadata": meta, # not json encoded + }, # type: ignore + ) + assert response.status_code == 400 + assert "Error parsing metadata json" in response.json()["error"] + + response = test_client.post( + f"/report_asset_materialization/{my_asset_key}", + json={ + "metadata": "im_just_a_string", + }, + ) + assert response.status_code == 400 + assert ( + 'Error constructing AssetMaterialization: Param "metadata" is not' + in response.json()["error"] + )