Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-k8s] working op mutating executor #3

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

abhinavDhulipala
Copy link
Owner

@abhinavDhulipala abhinavDhulipala commented Oct 29, 2024

Summary & Motivation

Currently you aren't allowed to dynamically orchestrate a new op in a job executor from a prior ops output. This is because Dagster has a pretty hard delineation between a StepOrchestrationContext and a StepExecutionContext. I initially, simply wanted to pass in a custom StepLauncher as a resource that'd handle this for me. That level of abstraction moving forward would probably be the most sane way to implement this feature. Unfortunately, that would require a much more intrusive rewrite of the current K8sStepHandler as opposed to the more simple inheritance I have here. I'm also unsure as to whether that's possible. To avoid multiple, potentially expensive io_manager calls, we cache step inputs. That way, while a step is running and polling _get_container_context we don't reinitialize on every call. We save memory here by dropping the entry once the step is terminated. I considered the following 3 approaches to this problem:

1. Downstream K8sDownstreamOpSpecPipe Resource

I created the K8sDownstreamOpSpecPipe which would have an upstream op name/identify it's downstream steps and use this resource to log the configs it wanted to pass on. Here is a simple example of how this worked:

@job(executor_def=k8s_op_mutating_executor)
def simple_job():
    @op
    def mock_op(context: OpExecutionContext, pipe: K8sDownstreamOpSpecPipe) -> int:
       # add and commit config like git. Resource would ensure you don't commit twice
       # as well as perform data checks on to ensure each entry is a valid k8s config
        pipe.add_downstream_configs(
            context,
            {
                "downstream": {
                    "pod_template_spec_metadata": {
                        "labels": {"example_label_key": "example_label_value"}
                    }
                }
            },
        )
        pipe.commit_downstream_configs(context)
        return 1

    @op(tags={'dagster-k8s/config': {..., 'from_ops': ['mock_op']}})
    def downstream(context: OpExecutionContext, mock_op: int):
        context.log.info(f"recieved input {mock_op}")

    downstream(mock_op())

Under the hood, this would emmit an DagsterEngineEvent that stored the k8s configs as EngineEventData. It would store the log on the downstream step itself, so querying for the config
would be super fast for the step handler. I created some custom queries to efficiently query the event log for step configs. The way I arrived at this solution was that I initially was going
to create a custom schema for passing down op to my mutating executor. As I developed the schema, I noticed it looked a lot like the event schema which is pretty generalizable.

Problems:

  • For dynamic outputs, ahead of time step validation and placement was not possible. I rely heavily on dynamic outputs.
  • Multiple source resolution became unintuitive and would be a pain to test and reason about.
  • Flooded my logs with EngineEvents.
  • The DagsterEventType is probably a postgres enum under the hood, so I couldn't extend it how I want to for efficient log queries.
  • Scales poorly with engine events. Some of my op graphs are very large and scanning engine engine events from multiple steps, I thought would scale poorly (debatable)
    • Ended up implementing pagination schemes and querying only current steps, so it performed reasonably well in my stress tests.

2. Output Metadata Consumption

This is very similar to the approach I went with. Essentially, use output metadata if present, to configure the downstream op.

@job(executor_def=k8s_op_mutating_executor)
def simple_job():
    @op
    def mock_op(context: OpExecutionContext) -> int:
        return Output(1, metadata={
            'dagster-k8s/config': {}  # k8s run configs
        })

    @op(tags={'dagster-k8s/config': {..., 'mutating_op': True}})
    def downstream(context: OpExecutionContext, mock_op: int):
        context.log.info(f"recieved input {mock_op}")

    downstream(mock_op())

Because we wouldn't want to eagerly load resources we didn't need, I checked the op tags and only eagerly loaded parent steps.
The main problem here was that after a while of hacking with it, I couldn't cleanly get output metadata from an input context.
I had to do some hacky nonsense to get the metadata from the event log by way of output handles and log queries.
As such, I marked this a failure and moved on.

3. Custom Output Wrapper Type (Implemented)

I created a custom dagster type called K8sOpMutatingOutput. This is a wrapper that can accept a value and a k8s config.

@job(executor_def=k8s_op_mutating_executor)
def simple_prod_sink():
    @op
    def producer() -> K8sOpMutatingOutput:
        return K8sOpMutatingOutput(
            value=1,
            k8s_config={
                "container_config": {
                    "resources": {
                        "requests": {"cpu": "275m", "memory": "32Mi"},
                    }
                }
            },
        )

    @op
    def sink(context: OpExecutionContext, config: SleepConfig, producer: K8sOpMutatingOutput):
        context.log.info(f"sleeping {config.sleep_sec} seconds...")
        sleep(config.sleep_sec)

    sink(producer())

The advantage with this method is that I don't need any extra configs in my op tag. As soon as an op consumes
this type of input, I can recognize it, and naturally resolve runtime configs in a way that's immediately apparent
on job definition (by observing an ops inputs).

Problems:

  • We fake a step execution context, which means required resources for the op are loaded.
    if these resources have destructive side effects, then this could lead to unintended consequences
    or miscounting/labeling. The step handler now instantiates an execution context with the run launcher pod
    not the op run pod. And the resources get instatiated twice.

    • as an enhancement, I plan to only instantiate the required io manager to load the particular input.
      With this enancement, I'd consider it fair, although technically, the same problem for custom io managers exists.
  • During my stress tests, I didn't notice a particularly bad hit to step handler performance, but concievably,
    this could get bad. Especially, with the implementation as it stands today, where a steps entire execution
    context is built for eager loading.

  • If the value you are wrapping is particularly large, this will impact step handler performance

The reason we can use this is because for the steps we employ this on, our resources don't have destructive
or slow side affects and minimal input payloads.

Other features

We cache K8sContainerContexts. This contributes slightly the the StepHandlers memory overhead, but is well worth
not resolving the same inputs multiple times. A potential enhancement here for outputs that are repeatedly used
is to implement an lru cache for rendering particular op output configs as well.

We get neat logging as well, for confirmation that inputs got picked up. On the op being configured, we get the following log.

image

How I Tested These Changes

  • unit tests:
    • Created a shared IOManager and run example in memory jobs. Recreate orchestration contexts for particular steps and use said io managers to mimic execution at a given point in time. Pull InMemIOManger & KnownState from executed jobs
      to recreate an accurate step handler context.
  • staging and production:
    • Our setup is the standard helm based, eks, setup, and our code locations have been able to run this executor reliably.
    • Had stress tests, where I launched multiple jobs with 100+ dynamic ops and observed sane perf
    • Had pods sleep so I can kubectl get pod <op run pod> -oyaml and observe proper propagation of k8s configs.

Changelog

Insert changelog entry or "NOCHANGELOG" here.

  • NEW (added new feature or capability)
  • BUGFIX (fixed a bug)
  • DOCS (added or updated documentation)

My goal here is to at least gather feedback from the team on this code and it's potential to be upstreamed.
I'd love to have this live in the dagster repo, but because of it's limitations, understand if it crosses
one too many boundaries. My rationale for why I think this not a violation of responsibility is because of the
precedent dynamic outputs set. If we are ok with a varying amount of ops in our graph, I think we can be ok
if they manipulate the state of said ops. I can see why, after completing this exercise, these might be 2 separate concerns
though. Would love to hear any and all feedback.

@abhinavDhulipala abhinavDhulipala changed the title test mutating executor on simple job [dagster-k8s] working op mutating executor Oct 29, 2024
@abhinavDhulipala abhinavDhulipala force-pushed the op-mutating-exec-v2 branch 5 times, most recently from d2a91da to 017187b Compare October 31, 2024 01:45
@abhinavDhulipala abhinavDhulipala force-pushed the op-mutating-exec-v2 branch 11 times, most recently from 0648263 to ddc77e8 Compare November 9, 2024 21:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant