-
Hello everyone, I'm creating a POC for dagster to try and see the features and how it would work and have a graph, which runs on a set schedule, ex. everyday at 00:05, how can I get the date dynamically and pass it through the Ops used in the graph? The question was originally asked in Dagster Slack. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
The typical way to do this is through configuration. In this docs example, we have two options to pass in that information and both can be accessed from from the optional context arg (
Here's a full code snippet: from dagster import schedule, op, job, OpExecutionContext, ScheduleEvaluationContext, RunRequest
@op(config_schema={"scheduled_date": str})
def configurable_op(context: OpExecutionContext):
print(context.op_config["scheduled_date"])
print(context.get_tag("date"))
@op
def another_op(context: OpExecutionContext):
print(context.get_tag("date"))
@job
def configurable_job():
configurable_op()
another_op()
@schedule(
job=configurable_job, cron_schedule="50 17 * * *", execution_timezone="America/Los_Angeles"
)
def configurable_job_schedule(context: ScheduleEvaluationContext):
scheduled_date = context.scheduled_execution_time.strftime("%Y-%m-%d")
return RunRequest(
run_config={
"ops": {"configurable_op": {"config": {"scheduled_date": scheduled_date,}}} # Option 1
},
tags={"date": scheduled_date}, # Option 2
) If you want every op in the job to have access to the same config, the other way to do that is through a resource: https://docs.dagster.io/concepts/configuration/config-schema#passing-configuration-to-multiple-ops-in-a-job |
Beta Was this translation helpful? Give feedback.
The typical way to do this is through configuration. In this docs example, we have two options to pass in that information and both can be accessed from from the optional context arg (
OpExecutionContext
):Here's a full code snippet: