From 22219ad154e57dd12b1188407c04b6eb8754f1c2 Mon Sep 17 00:00:00 2001 From: alangenfeld Date: Mon, 18 Sep 2023 14:49:45 -0500 Subject: [PATCH] [runless] report_asset_materialization endpoint --- .../dagster_webserver/webserver.py | 125 +++++++++++++ .../webserver/test_app.py | 176 ++++++++++++++++++ python_modules/dagster-webserver/tox.ini | 1 + 3 files changed, 302 insertions(+) diff --git a/python_modules/dagster-webserver/dagster_webserver/webserver.py b/python_modules/dagster-webserver/dagster_webserver/webserver.py index 0973f43c7b5ac..735d3f610e351 100644 --- a/python_modules/dagster-webserver/dagster_webserver/webserver.py +++ b/python_modules/dagster-webserver/dagster_webserver/webserver.py @@ -8,6 +8,11 @@ from dagster import __version__ as dagster_version from dagster._annotations import deprecated from dagster._core.debug import DebugRunPayload +from dagster._core.definitions.data_version import ( + DATA_VERSION_IS_USER_PROVIDED_TAG, + DATA_VERSION_TAG, +) +from dagster._core.definitions.events import AssetKey, AssetMaterialization from dagster._core.storage.cloud_storage_compute_log_manager import CloudStorageComputeLogManager from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._core.storage.local_compute_log_manager import LocalComputeLogManager @@ -195,6 +200,107 @@ async def download_captured_logs_endpoint(self, request: Request): filebase = "__".join(log_key) return FileResponse(location, filename=f"{filebase}.{file_extension}") + async def report_asset_materialization_endpoint(self, request: Request) -> Response: + # Record a runless asset materialization event. + # The asset key is passed as url path with / delimiting parts or as a query param. + # Properties can be passed as json post body or query params, with that order of precedence. + + context = self.make_request_context(request) + + body_content_type = request.headers.get("content-type") + if body_content_type is None: + json_body = {} + elif body_content_type == "application/json": + json_body = await request.json() + else: + return JSONResponse( + { + "error": ( + f"Unhandled content type {body_content_type}, expect no body or" + " application/json" + ), + }, + status_code=400, + ) + + asset_key = None + if request.path_params.get(ReportAssetMatParam.asset_key): + # use from_user_string to treat / as multipart key separator + asset_key = AssetKey.from_user_string(request.path_params["asset_key"]) + elif ReportAssetMatParam.asset_key in json_body: + asset_key = AssetKey(json_body[ReportAssetMatParam.asset_key]) + elif ReportAssetMatParam.asset_key in request.query_params: + asset_key = AssetKey.from_db_string(request.query_params["asset_key"]) + + if asset_key is None: + return JSONResponse( + { + "error": ( + "Empty asset key, must provide asset key as url path after" + " /report_asset_materialization/ or query param asset_key." + ), + }, + status_code=400, + ) + + tags = None + if ReportAssetMatParam.data_version in json_body: + tags = { + DATA_VERSION_TAG: json_body[ReportAssetMatParam.data_version], + DATA_VERSION_IS_USER_PROVIDED_TAG: "true", + } + elif ReportAssetMatParam.data_version in request.query_params: + tags = { + DATA_VERSION_TAG: request.query_params[ReportAssetMatParam.data_version], + DATA_VERSION_IS_USER_PROVIDED_TAG: "true", + } + + partition = None + if ReportAssetMatParam.partition in json_body: + partition = json_body[ReportAssetMatParam.partition] + elif ReportAssetMatParam.partition in request.query_params: + partition = request.query_params[ReportAssetMatParam.partition] + + description = None + if ReportAssetMatParam.description in json_body: + description = json_body[ReportAssetMatParam.description] + elif ReportAssetMatParam.description in request.query_params: + description = request.query_params[ReportAssetMatParam.description] + + metadata = None + if ReportAssetMatParam.metadata in json_body: + metadata = json_body["metadata"] + elif ReportAssetMatParam.metadata in request.query_params: + try: + metadata = json.loads(request.query_params[ReportAssetMatParam.metadata]) + except Exception as exc: + return JSONResponse( + { + "error": f"Error parsing metadata json: {exc}", + }, + status_code=400, + ) + + try: + mat = AssetMaterialization( + asset_key=asset_key, + partition=partition, + metadata=metadata, + description=description, + tags=tags, + ) + except Exception as exc: + return JSONResponse( + { + "error": f"Error constructing AssetMaterialization: {exc}", + }, + status_code=400, + ) + + context.instance.report_runless_asset_event(mat) + + return JSONResponse({}) + def index_html_endpoint(self, request: Request): """Serves root html.""" index_path = self.relative_path("webapp/build/index.html") @@ -304,6 +410,11 @@ def build_routes(self): "/download_debug/{run_id:str}", self.download_debug_file_endpoint, ), + Route( + "/report_asset_materialization/{asset_key:path}", + self.report_asset_materialization_endpoint, + methods=["POST"], + ), Route("/{path:path}", self.index_html_endpoint), Route("/", self.index_html_endpoint), ] @@ -345,3 +456,17 @@ def send_wrapper(message: Message): return send(message) await self.app(scope, receive, send_wrapper) + + +class ReportAssetMatParam: + """Class to collect all supported args by report_asset_materialization endpoint + to ensure consistency with related APIs. + + note: Enum not used to avoid value type problems X(str, Enum) doesn't work as partition conflicts with keyword + """ + + asset_key = "asset_key" + data_version = "data_version" + metadata = "metadata" + description = "description" + partition = "partition" diff --git a/python_modules/dagster-webserver/dagster_webserver_tests/webserver/test_app.py b/python_modules/dagster-webserver/dagster_webserver_tests/webserver/test_app.py index 544dd7efa2b32..985d79e70f73e 100644 --- a/python_modules/dagster-webserver/dagster_webserver_tests/webserver/test_app.py +++ b/python_modules/dagster-webserver/dagster_webserver_tests/webserver/test_app.py @@ -1,19 +1,28 @@ import gc +import inspect import objgraph import pytest from dagster import ( + DagsterInstance, __version__ as dagster_version, job, op, ) +from dagster._core.definitions.data_version import ( + DATA_VERSION_IS_USER_PROVIDED_TAG, + DATA_VERSION_TAG, +) +from dagster._core.definitions.events import AssetKey, AssetMaterialization from dagster._core.events import DagsterEventType from dagster._serdes import unpack_value from dagster._seven import json from dagster._utils.error import SerializableErrorInfo +from dagster_ext import ExtContext from dagster_graphql.version import __version__ as dagster_graphql_version from dagster_webserver.graphql import GraphQLWS from dagster_webserver.version import __version__ as dagster_webserver_version +from dagster_webserver.webserver import ReportAssetMatParam from starlette.testclient import TestClient EVENT_LOG_SUBSCRIPTION = """ @@ -278,3 +287,170 @@ def test_download_compute(instance, test_client: TestClient): response = test_client.get(f"/download/{run_id}/jonx/stdout") assert response.status_code == 404 + + +def test_runless_events(instance: DagsterInstance, test_client: TestClient): + # base case + my_asset_key = "my_asset" + response = test_client.post(f"/report_asset_materialization/{my_asset_key}") + assert response.status_code == 200 + evt = instance.get_latest_materialization_event(AssetKey(my_asset_key)) + assert evt + + # empty asset key path + response = test_client.post("/report_asset_materialization/") + assert response.status_code == 400 + + # multipart key + long_key = AssetKey(["foo", "bar", "baz"]) + response = test_client.post( + # url / delimiter for multipart keys + "/report_asset_materialization/foo/bar/baz", # long_key.to_user_string() + ) + assert response.status_code == 200 + evt = instance.get_latest_materialization_event(long_key) + assert evt + + # slash in key (have to use query param) + slash_key = AssetKey("slash/key") + response = test_client.post( + # have to urlencode / that are part of they key + "/report_asset_materialization/", + params={"asset_key": '["slash/key"]'}, # slash_key.to_string(), + ) + assert response.status_code == 200 + evt = instance.get_latest_materialization_event(slash_key) + assert evt + + # multi part with slashes (have to use query param) + nasty_key = AssetKey(["a/b", "c/d"]) + response = test_client.post( + # have to urlencode / that are part of they key + "/report_asset_materialization/", + json={ + "asset_key": ["a/b", "c/d"], # same args passed to AssetKey + }, + ) + assert response.status_code == 200 + evt = instance.get_latest_materialization_event(nasty_key) + assert evt + + meta = {"my_metadata": "value"} + mat = AssetMaterialization( + asset_key=my_asset_key, + partition="2021-09-23", + description="cutest", + metadata=meta, + tags={ + DATA_VERSION_TAG: "new", + DATA_VERSION_IS_USER_PROVIDED_TAG: "true", + }, + ) + + # kitchen sink json body + response = test_client.post( + f"/report_asset_materialization/{my_asset_key}", + json={ + "description": mat.description, + "partition": mat.partition, + "metadata": meta, # handled separately to avoid MetadataValue ish + "data_version": "new", + }, + ) + assert response.status_code == 200 + evt = instance.get_latest_materialization_event(AssetKey(my_asset_key)) + assert evt + assert evt.asset_materialization + assert evt.asset_materialization == mat + + # kitchen sink query params + response = test_client.post( + f"/report_asset_materialization/{my_asset_key}", + params={ + "description": mat.description, + "partition": mat.partition, + "metadata": json.dumps(meta), + "data_version": "new", + }, + ) + assert response.status_code == 200, response.json() + evt = instance.get_latest_materialization_event(AssetKey(my_asset_key)) + assert evt + assert evt.asset_materialization + assert evt.asset_materialization == mat + + # bad metadata + response = test_client.post( + f"/report_asset_materialization/{my_asset_key}", + params={ + "metadata": meta, # not json encoded + }, # type: ignore + ) + assert response.status_code == 400 + assert "Error parsing metadata json" in response.json()["error"] + + response = test_client.post( + f"/report_asset_materialization/{my_asset_key}", + json={ + "metadata": "im_just_a_string", + }, + ) + assert response.status_code == 400 + assert ( + 'Error constructing AssetMaterialization: Param "metadata" is not' + in response.json()["error"] + ) + + +def test_report_asset_materialization_apis_consistent( + instance: DagsterInstance, test_client: TestClient +): + # ensure the ext report_asset_materialization and the API endpoint have the same capabilities + sample_payload = { + "asset_key": "sample_key", + "metadata": {"meta": "data"}, + "data_version": "so_new", + "partition": "2023-09-23", + "description": "boo", + } + + # sample has entry for all supported params (banking on usage of enum) + assert set(sample_payload.keys()) == set( + {v for k, v in vars(ReportAssetMatParam).items() if not k.startswith("__")} + ) + + response = test_client.post("/report_asset_materialization/", json=sample_payload) + assert response.status_code == 200 + evt = instance.get_latest_materialization_event(AssetKey(sample_payload["asset_key"])) + assert evt + mat = evt.asset_materialization + assert mat + + for k, v in sample_payload.items(): + if k == "asset_key": + assert mat.asset_key == AssetKey(v) + elif k == "metadata": + assert mat.metadata.keys() == v.keys() + elif k == "data_version": + tags = mat.tags + assert tags + assert tags[DATA_VERSION_TAG] == v + assert tags[DATA_VERSION_IS_USER_PROVIDED_TAG] + elif k == "partition": + assert mat.partition == v + elif k == "description": + assert mat.description == v + else: + assert ( + False + ), "need to add validation that sample payload content was written successfully" + + # all ext report_asset_materialization kwargs should be in sample payload + sig = inspect.signature(ExtContext.report_asset_materialization) + skip_set = {"self"} + params = [p for p in sig.parameters if p not in skip_set] + + # things that are missing on ExtContext.report_asset_materialization + KNOWN_DIFF = {"partition", "description"} + + assert set(sample_payload.keys()).difference(set(params)) == KNOWN_DIFF diff --git a/python_modules/dagster-webserver/tox.ini b/python_modules/dagster-webserver/tox.ini index a8c11df3ae5a0..7c322c4aa620e 100644 --- a/python_modules/dagster-webserver/tox.ini +++ b/python_modules/dagster-webserver/tox.ini @@ -13,6 +13,7 @@ deps = -e ../dagster-pipes -e ../dagster-graphql -e .[notebook,test] + -e ../dagster-ext-process allowlist_externals = /bin/bash