-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[runless] report_asset_materialization endpoint #16602
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,11 @@ | |
from dagster import __version__ as dagster_version | ||
from dagster._annotations import deprecated | ||
from dagster._core.debug import DebugRunPayload | ||
from dagster._core.definitions.data_version import ( | ||
DATA_VERSION_IS_USER_PROVIDED_TAG, | ||
DATA_VERSION_TAG, | ||
) | ||
from dagster._core.definitions.events import AssetKey, AssetMaterialization | ||
from dagster._core.storage.cloud_storage_compute_log_manager import CloudStorageComputeLogManager | ||
from dagster._core.storage.compute_log_manager import ComputeIOType | ||
from dagster._core.storage.local_compute_log_manager import LocalComputeLogManager | ||
|
@@ -195,6 +200,107 @@ async def download_captured_logs_endpoint(self, request: Request): | |
filebase = "__".join(log_key) | ||
return FileResponse(location, filename=f"{filebase}.{file_extension}") | ||
|
||
async def report_asset_materialization_endpoint(self, request: Request) -> Response: | ||
# Record a runless asset materialization event. | ||
# The asset key is passed as url path with / delimiting parts or as a query param. | ||
# Properties can be passed as json post body or query params, with that order of precedence. | ||
|
||
context = self.make_request_context(request) | ||
|
||
body_content_type = request.headers.get("content-type") | ||
if body_content_type is None: | ||
json_body = {} | ||
elif body_content_type == "application/json": | ||
json_body = await request.json() | ||
else: | ||
return JSONResponse( | ||
{ | ||
"error": ( | ||
f"Unhandled content type {body_content_type}, expect no body or" | ||
" application/json" | ||
), | ||
}, | ||
status_code=400, | ||
) | ||
|
||
asset_key = None | ||
if request.path_params.get(ReportAssetMatParam.asset_key): | ||
# use from_user_string to treat / as multipart key separator | ||
asset_key = AssetKey.from_user_string(request.path_params["asset_key"]) | ||
elif ReportAssetMatParam.asset_key in json_body: | ||
asset_key = AssetKey(json_body[ReportAssetMatParam.asset_key]) | ||
elif ReportAssetMatParam.asset_key in request.query_params: | ||
asset_key = AssetKey.from_db_string(request.query_params["asset_key"]) | ||
|
||
if asset_key is None: | ||
return JSONResponse( | ||
{ | ||
"error": ( | ||
"Empty asset key, must provide asset key as url path after" | ||
" /report_asset_materialization/ or query param asset_key." | ||
), | ||
}, | ||
status_code=400, | ||
) | ||
|
||
tags = None | ||
if ReportAssetMatParam.data_version in json_body: | ||
tags = { | ||
DATA_VERSION_TAG: json_body[ReportAssetMatParam.data_version], | ||
DATA_VERSION_IS_USER_PROVIDED_TAG: "true", | ||
} | ||
elif ReportAssetMatParam.data_version in request.query_params: | ||
tags = { | ||
DATA_VERSION_TAG: request.query_params[ReportAssetMatParam.data_version], | ||
DATA_VERSION_IS_USER_PROVIDED_TAG: "true", | ||
} | ||
|
||
partition = None | ||
if ReportAssetMatParam.partition in json_body: | ||
partition = json_body[ReportAssetMatParam.partition] | ||
elif ReportAssetMatParam.partition in request.query_params: | ||
partition = request.query_params[ReportAssetMatParam.partition] | ||
|
||
description = None | ||
if ReportAssetMatParam.description in json_body: | ||
description = json_body[ReportAssetMatParam.description] | ||
elif ReportAssetMatParam.description in request.query_params: | ||
description = request.query_params[ReportAssetMatParam.description] | ||
|
||
metadata = None | ||
if ReportAssetMatParam.metadata in json_body: | ||
metadata = json_body["metadata"] | ||
elif ReportAssetMatParam.metadata in request.query_params: | ||
try: | ||
metadata = json.loads(request.query_params[ReportAssetMatParam.metadata]) | ||
except Exception as exc: | ||
return JSONResponse( | ||
{ | ||
"error": f"Error parsing metadata json: {exc}", | ||
}, | ||
status_code=400, | ||
) | ||
|
||
try: | ||
mat = AssetMaterialization( | ||
asset_key=asset_key, | ||
partition=partition, | ||
metadata=metadata, | ||
description=description, | ||
tags=tags, | ||
) | ||
except Exception as exc: | ||
return JSONResponse( | ||
{ | ||
"error": f"Error constructing AssetMaterialization: {exc}", | ||
}, | ||
status_code=400, | ||
) | ||
|
||
context.instance.report_runless_asset_event(mat) | ||
|
||
return JSONResponse({}) | ||
|
||
def index_html_endpoint(self, request: Request): | ||
"""Serves root html.""" | ||
index_path = self.relative_path("webapp/build/index.html") | ||
|
@@ -304,6 +410,11 @@ def build_routes(self): | |
"/download_debug/{run_id:str}", | ||
self.download_debug_file_endpoint, | ||
), | ||
Route( | ||
"/report_asset_materialization/{asset_key:path}", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. differs from |
||
self.report_asset_materialization_endpoint, | ||
methods=["POST"], | ||
), | ||
Route("/{path:path}", self.index_html_endpoint), | ||
Route("/", self.index_html_endpoint), | ||
] | ||
|
@@ -345,3 +456,17 @@ def send_wrapper(message: Message): | |
return send(message) | ||
|
||
await self.app(scope, receive, send_wrapper) | ||
|
||
|
||
class ReportAssetMatParam: | ||
"""Class to collect all supported args by report_asset_materialization endpoint | ||
to ensure consistency with related APIs. | ||
|
||
note: Enum not used to avoid value type problems X(str, Enum) doesn't work as partition conflicts with keyword | ||
""" | ||
|
||
asset_key = "asset_key" | ||
data_version = "data_version" | ||
metadata = "metadata" | ||
description = "description" | ||
partition = "partition" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,19 +1,28 @@ | ||
import gc | ||
import inspect | ||
|
||
import objgraph | ||
import pytest | ||
from dagster import ( | ||
DagsterInstance, | ||
__version__ as dagster_version, | ||
job, | ||
op, | ||
) | ||
from dagster._core.definitions.data_version import ( | ||
DATA_VERSION_IS_USER_PROVIDED_TAG, | ||
DATA_VERSION_TAG, | ||
) | ||
from dagster._core.definitions.events import AssetKey, AssetMaterialization | ||
from dagster._core.events import DagsterEventType | ||
from dagster._serdes import unpack_value | ||
from dagster._seven import json | ||
from dagster._utils.error import SerializableErrorInfo | ||
from dagster_ext import ExtContext | ||
from dagster_graphql.version import __version__ as dagster_graphql_version | ||
from dagster_webserver.graphql import GraphQLWS | ||
from dagster_webserver.version import __version__ as dagster_webserver_version | ||
from dagster_webserver.webserver import ReportAssetMatParam | ||
from starlette.testclient import TestClient | ||
|
||
EVENT_LOG_SUBSCRIPTION = """ | ||
|
@@ -278,3 +287,170 @@ def test_download_compute(instance, test_client: TestClient): | |
|
||
response = test_client.get(f"/download/{run_id}/jonx/stdout") | ||
assert response.status_code == 404 | ||
|
||
|
||
def test_runless_events(instance: DagsterInstance, test_client: TestClient): | ||
# base case | ||
my_asset_key = "my_asset" | ||
response = test_client.post(f"/report_asset_materialization/{my_asset_key}") | ||
assert response.status_code == 200 | ||
evt = instance.get_latest_materialization_event(AssetKey(my_asset_key)) | ||
assert evt | ||
|
||
# empty asset key path | ||
response = test_client.post("/report_asset_materialization/") | ||
assert response.status_code == 400 | ||
|
||
# multipart key | ||
long_key = AssetKey(["foo", "bar", "baz"]) | ||
response = test_client.post( | ||
# url / delimiter for multipart keys | ||
"/report_asset_materialization/foo/bar/baz", # long_key.to_user_string() | ||
) | ||
assert response.status_code == 200 | ||
evt = instance.get_latest_materialization_event(long_key) | ||
assert evt | ||
|
||
# slash in key (have to use query param) | ||
slash_key = AssetKey("slash/key") | ||
response = test_client.post( | ||
# have to urlencode / that are part of they key | ||
"/report_asset_materialization/", | ||
params={"asset_key": '["slash/key"]'}, # slash_key.to_string(), | ||
) | ||
assert response.status_code == 200 | ||
evt = instance.get_latest_materialization_event(slash_key) | ||
assert evt | ||
|
||
# multi part with slashes (have to use query param) | ||
nasty_key = AssetKey(["a/b", "c/d"]) | ||
response = test_client.post( | ||
# have to urlencode / that are part of they key | ||
"/report_asset_materialization/", | ||
json={ | ||
"asset_key": ["a/b", "c/d"], # same args passed to AssetKey | ||
}, | ||
) | ||
assert response.status_code == 200 | ||
evt = instance.get_latest_materialization_event(nasty_key) | ||
assert evt | ||
|
||
meta = {"my_metadata": "value"} | ||
mat = AssetMaterialization( | ||
asset_key=my_asset_key, | ||
partition="2021-09-23", | ||
description="cutest", | ||
metadata=meta, | ||
tags={ | ||
DATA_VERSION_TAG: "new", | ||
DATA_VERSION_IS_USER_PROVIDED_TAG: "true", | ||
}, | ||
) | ||
|
||
# kitchen sink json body | ||
response = test_client.post( | ||
f"/report_asset_materialization/{my_asset_key}", | ||
json={ | ||
"description": mat.description, | ||
"partition": mat.partition, | ||
"metadata": meta, # handled separately to avoid MetadataValue ish | ||
"data_version": "new", | ||
}, | ||
) | ||
assert response.status_code == 200 | ||
evt = instance.get_latest_materialization_event(AssetKey(my_asset_key)) | ||
assert evt | ||
assert evt.asset_materialization | ||
assert evt.asset_materialization == mat | ||
|
||
# kitchen sink query params | ||
response = test_client.post( | ||
f"/report_asset_materialization/{my_asset_key}", | ||
params={ | ||
"description": mat.description, | ||
"partition": mat.partition, | ||
"metadata": json.dumps(meta), | ||
"data_version": "new", | ||
}, | ||
) | ||
assert response.status_code == 200, response.json() | ||
evt = instance.get_latest_materialization_event(AssetKey(my_asset_key)) | ||
assert evt | ||
assert evt.asset_materialization | ||
assert evt.asset_materialization == mat | ||
|
||
# bad metadata | ||
response = test_client.post( | ||
f"/report_asset_materialization/{my_asset_key}", | ||
params={ | ||
"metadata": meta, # not json encoded | ||
}, # type: ignore | ||
) | ||
assert response.status_code == 400 | ||
assert "Error parsing metadata json" in response.json()["error"] | ||
|
||
response = test_client.post( | ||
f"/report_asset_materialization/{my_asset_key}", | ||
json={ | ||
"metadata": "im_just_a_string", | ||
}, | ||
) | ||
assert response.status_code == 400 | ||
assert ( | ||
'Error constructing AssetMaterialization: Param "metadata" is not' | ||
in response.json()["error"] | ||
) | ||
|
||
|
||
def test_report_asset_materialization_apis_consistent( | ||
instance: DagsterInstance, test_client: TestClient | ||
): | ||
# ensure the ext report_asset_materialization and the API endpoint have the same capabilities | ||
sample_payload = { | ||
"asset_key": "sample_key", | ||
"metadata": {"meta": "data"}, | ||
"data_version": "so_new", | ||
"partition": "2023-09-23", | ||
"description": "boo", | ||
} | ||
|
||
# sample has entry for all supported params (banking on usage of enum) | ||
assert set(sample_payload.keys()) == set( | ||
{v for k, v in vars(ReportAssetMatParam).items() if not k.startswith("__")} | ||
) | ||
|
||
response = test_client.post("/report_asset_materialization/", json=sample_payload) | ||
assert response.status_code == 200 | ||
evt = instance.get_latest_materialization_event(AssetKey(sample_payload["asset_key"])) | ||
assert evt | ||
mat = evt.asset_materialization | ||
assert mat | ||
|
||
for k, v in sample_payload.items(): | ||
if k == "asset_key": | ||
assert mat.asset_key == AssetKey(v) | ||
elif k == "metadata": | ||
assert mat.metadata.keys() == v.keys() | ||
elif k == "data_version": | ||
tags = mat.tags | ||
assert tags | ||
assert tags[DATA_VERSION_TAG] == v | ||
assert tags[DATA_VERSION_IS_USER_PROVIDED_TAG] | ||
elif k == "partition": | ||
assert mat.partition == v | ||
elif k == "description": | ||
assert mat.description == v | ||
else: | ||
assert ( | ||
False | ||
), "need to add validation that sample payload content was written successfully" | ||
|
||
# all ext report_asset_materialization kwargs should be in sample payload | ||
sig = inspect.signature(ExtContext.report_asset_materialization) | ||
skip_set = {"self"} | ||
params = [p for p in sig.parameters if p not in skip_set] | ||
|
||
# things that are missing on ExtContext.report_asset_materialization | ||
KNOWN_DIFF = {"partition", "description"} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should these get added to |
||
|
||
assert set(sample_payload.keys()).difference(set(params)) == KNOWN_DIFF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@smackesey to confirm this
data_version
handling