diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/run_attribution/__init__.py b/examples/docs_snippets/docs_snippets/guides/dagster/run_attribution/__init__.py deleted file mode 100644 index e6c954f7f645d..0000000000000 --- a/examples/docs_snippets/docs_snippets/guides/dagster/run_attribution/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .custom_run_coordinator import CustomRunCoordinator as CustomRunCoordinator diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/run_attribution/custom_run_coordinator.py b/examples/docs_snippets/docs_snippets/guides/dagster/run_attribution/custom_run_coordinator.py deleted file mode 100644 index 6063ecc8afd15..0000000000000 --- a/examples/docs_snippets/docs_snippets/guides/dagster/run_attribution/custom_run_coordinator.py +++ /dev/null @@ -1,39 +0,0 @@ -import warnings -from base64 import b64decode -from json import JSONDecodeError, loads -from typing import Optional - -from dagster import DagsterRun, QueuedRunCoordinator, SubmitRunContext - - -class CustomRunCoordinator(QueuedRunCoordinator): - # start_email_marker - def get_email(self, jwt_claims_header: Optional[str]) -> Optional[str]: - if not jwt_claims_header: - return None - - split_header_tokens = jwt_claims_header.split(".") - if len(split_header_tokens) < 2: - return None - - decoded_claims_json_str = b64decode(split_header_tokens[1]) - try: - claims_json = loads(decoded_claims_json_str) - return claims_json.get("email") - except JSONDecodeError: - return None - - # end_email_marker - - # start_submit_marker - def submit_run(self, context: SubmitRunContext) -> DagsterRun: - dagster_run = context.dagster_run - jwt_claims_header = context.get_request_header("X-Amzn-Oidc-Data") - email = self.get_email(jwt_claims_header) - if email: - self._instance.add_run_tags(dagster_run.run_id, {"user": email}) - else: - warnings.warn(f"Couldn't decode JWT header {jwt_claims_header}") - return super().submit_run(context) - - # end_submit_marker diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/run_attribution/custom_run_coordinator_skeleton.py b/examples/docs_snippets/docs_snippets/guides/dagster/run_attribution/custom_run_coordinator_skeleton.py deleted file mode 100644 index 324b52e46041c..0000000000000 --- a/examples/docs_snippets/docs_snippets/guides/dagster/run_attribution/custom_run_coordinator_skeleton.py +++ /dev/null @@ -1,15 +0,0 @@ -# ruff: isort: skip_file - -CUSTOM_HEADER_NAME = "X-SOME-HEADER" -# start_custom_run_coordinator_marker - -from dagster import DagsterRun, QueuedRunCoordinator, SubmitRunContext - - -class CustomRunCoordinator(QueuedRunCoordinator): - def submit_run(self, context: SubmitRunContext) -> DagsterRun: # type: ignore # (didactic) - desired_header = context.get_request_header(CUSTOM_HEADER_NAME) - ... - - -# end_custom_run_coordinator_marker diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/run_attribution/dagster.yaml b/examples/docs_snippets/docs_snippets/guides/dagster/run_attribution/dagster.yaml deleted file mode 100644 index 21a31d6353696..0000000000000 --- a/examples/docs_snippets/docs_snippets/guides/dagster/run_attribution/dagster.yaml +++ /dev/null @@ -1,3 +0,0 @@ -run_coordinator: - module: run_attribution_example - class: CustomRunCoordinator diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/run_attribution/values.yaml b/examples/docs_snippets/docs_snippets/guides/dagster/run_attribution/values.yaml deleted file mode 100644 index 1d672296ea8c8..0000000000000 --- a/examples/docs_snippets/docs_snippets/guides/dagster/run_attribution/values.yaml +++ /dev/null @@ -1,9 +0,0 @@ -dagsterDaemon: - runCoordinator: - enabled: true - type: CustomRunCoordinator - config: - customRunCoordinator: - module: run_attribution_example - class: CustomRunCoordinator - config: {} diff --git a/examples/docs_snippets/docs_snippets_tests/guides_tests/run_attribution_tests/test_custom_run_coordinator.py b/examples/docs_snippets/docs_snippets_tests/guides_tests/run_attribution_tests/test_custom_run_coordinator.py deleted file mode 100644 index c76e585fe719b..0000000000000 --- a/examples/docs_snippets/docs_snippets_tests/guides_tests/run_attribution_tests/test_custom_run_coordinator.py +++ /dev/null @@ -1,74 +0,0 @@ -from collections import namedtuple - -import pytest -from dagster_tests.core_tests.run_coordinator_tests.test_queued_run_coordinator import ( - TestQueuedRunCoordinator, -) -from mock import patch - -from dagster._core.run_coordinator import SubmitRunContext -from dagster._core.storage.dagster_run import DagsterRunStatus -from docs_snippets.guides.dagster.run_attribution.custom_run_coordinator import ( - CustomRunCoordinator, -) - - -class TestCustomRunCoordinator(TestQueuedRunCoordinator): - @pytest.fixture(scope="function") - def coordinator(self, instance): - coordinator = CustomRunCoordinator() - coordinator.register_instance(instance) - yield coordinator - - def test_session_header_decode_failure( - self, instance, coordinator, workspace, external_pipeline - ): - run_id = "foo-1" - with patch( - "docs_snippets.guides.dagster.run_attribution.custom_run_coordinator.warnings" - ) as mock_warnings: - run = self.create_run_for_test( - instance, - external_pipeline, - run_id=run_id, - status=DagsterRunStatus.NOT_STARTED, - ) - returned_run = coordinator.submit_run(SubmitRunContext(run, workspace)) - - assert returned_run.run_id == run_id - assert returned_run.status == DagsterRunStatus.QUEUED - mock_warnings.warn.assert_called_once() - assert mock_warnings.warn.call_args.args[0].startswith( - "Couldn't decode JWT header" - ) - - def test_session_header_decode_success( - self, instance, coordinator, workspace, external_pipeline - ): - run_id, jwt_header, expected_email = ( - "foo", - "foo.eyJlbWFpbCI6ICJoZWxsb0BlbGVtZW50bC5jb20ifQ==.bar", - "hello@elementl.com", - ) - MockRequest = namedtuple("MockRequest", ["headers"]) - workspace._source = MockRequest( # noqa: SLF001 - headers={ - "X-Amzn-Trace-Id": "some_info", - "X-Amzn-Oidc-Data": jwt_header, - } - ) - - run = self.create_run_for_test( - instance, - external_pipeline, - run_id=run_id, - status=DagsterRunStatus.NOT_STARTED, - ) - returned_run = coordinator.submit_run(SubmitRunContext(run, workspace)) - - assert returned_run.run_id == run_id - assert returned_run.status == DagsterRunStatus.QUEUED - - fetched_run = instance.get_run_by_id(run_id) - assert len(fetched_run.tags) == 1 - assert fetched_run.tags["user"] == expected_email