Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: flow trigger docs #5561

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 66 additions & 38 deletions core/src/main/java/io/kestra/plugin/core/trigger/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,37 +41,67 @@
::"""
)
@Plugin(
examples = @Example(
title = "This flow will be triggered after each successful execution of flow `company.team.trigger_flow` " +
"and forward the `uri` of `my_task` taskId outputs.",
full = true,
code = """
id: trigger_flow_listener
namespace: company.team

inputs:
- id: from_parent
type: STRING

tasks:
- id: only_no_input
type: io.kestra.plugin.core.debug.Return
format: "v1: {{ trigger.executionId }}"

triggers:
- id: listen_flow
type: io.kestra.plugin.core.trigger.Flow
examples = {
@Example(
full = true,
title = """
Trigger the transform flow after the extract flow has successfully completed. \
The extract flow generates a `last_ingested_date` output that is passed to the \
transform flow as an input. Here is the extract flow:
```yaml
id: extract
namespace: company.team

tasks:
- id: final_date
type: io.kestra.plugin.core.debug.Return
format: "{{ execution.startDate | dateAdd(-2, 'DAYS') | date('yyyy-MM-dd') }}"

outputs:
- id: last_ingested_date
type: STRING
value: "{{ outputs.final_date.value }}"
```
Below is the transform flow that is triggered after the extract flow has successfully completed. \
The `last_ingested_date` output from the extract flow is passed to the \
`last_ingested_date` input of the transform flow.""",
code = """
id: transform
namespace: company.team

inputs:
from-parent: '{{ outputs.my_task.uri }}'
conditions:
- type: io.kestra.plugin.core.condition.ExecutionFlowCondition
namespace: company.team
flowId: trigger_flow
- type: io.kestra.plugin.core.condition.ExecutionStatusCondition
in:
- SUCCESS
"""
),
- id: last_ingested_date
type: STRING
defaults: "2025-01-01"

variables:
result: |
Ingestion done in {{ trigger.executionId }}.
Now transforming data up to {{ inputs.last_ingested_date }}

tasks:
- id: run_transform
type: io.kestra.plugin.core.debug.Return
format: "{{ render(vars.result) }}"

- id: log
type: io.kestra.plugin.core.log.Log
message: "{{ render(vars.result) }}"

triggers:
- id: run_after_extract
type: io.kestra.plugin.core.trigger.Flow
inputs:
last_ingested_date: "{{ trigger.outputs.last_ingested_date }}"
conditions:
- type: io.kestra.plugin.core.condition.ExecutionFlowCondition
namespace: company.team
flowId: extract
- type: io.kestra.plugin.core.condition.ExecutionStatusCondition
in:
- SUCCESS"""
),
},
aliases = "io.kestra.core.models.triggers.types.Flow"
)
public class Flow extends AbstractTrigger implements TriggerOutput<Flow.Output> {
Expand All @@ -81,12 +111,11 @@ public class Flow extends AbstractTrigger implements TriggerOutput<Flow.Output>

@Nullable
@Schema(
title = "Fill input of this flow based on output of current flow.",
title = "Pass upstream flow's outputs to inputs of the current flow.",
description = """
Fill input of this flow based on output of current flow, allowing to pass data or file to the triggered flow
The inputs allow you to pass data object or a file to the downstream flow as long as those outputs are defined on the flow-level in the upstream flow.
::alert{type="warning"}
If you provide invalid input, the flow will not be created! Since there is no task started, you can't log any reason that's visible on the Execution UI.
So you will need to go to the Logs tabs on the UI to understand the error.
Make sure that the inputs and task outputs defined in this Flow trigger match the outputs of the upstream flow. Otherwise, the downstream flow execution will not to be created. If that happens, go to the Logs tab on the Flow page to understand the error.
::"""
)
@PluginProperty
Expand All @@ -97,13 +126,12 @@ public class Flow extends AbstractTrigger implements TriggerOutput<Flow.Output>
title = "List of execution states that will be evaluated by the trigger",
description = """
By default, only executions in a terminal state will be evaluated.
If you use a condition of type `ExecutionStatusCondition` it will be evaluated after this list.
Any `ExecutionStatusCondition`-type condition will be evaluated after the list of `states`.
::alert{type="info"}
The trigger will be evaluated on each execution state change, this means that, for non-terminal state, they can be observed multiple times.
For example, if a flow has two `Pause` tasks, the execution will transition two times from PAUSED to RUNNING so theses states will be observed two times.
The trigger will be evaluated for each state change of matching executions. Keep in mind that if a flow has two `Pause` tasks, the execution will transition from PAUSED to a RUNNING state twice — one for each Pause task. The Flow trigger listening to a `PAUSED` state will be evaluated twice in this case.
::
::alert{type="warning"}
You cannot evaluate on the CREATED state.
Note that a Flow trigger cannot react to the `CREATED` state.
::"""
)
@Builder.Default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@ id: trigger-flow-listener-invalid
namespace: io.kestra.tests.trigger

inputs:
- id: from-parent
- id: from_parent
type: STRING
defaults: "default value"

tasks:
- id: only-invalid
type: io.kestra.plugin.core.debug.Return
format: "childs: {{inputs['from-parent']}}"
format: "childs: {{ inputs.from_parent }}"

triggers:
- id: listen-flow-invalid
type: io.kestra.plugin.core.trigger.Flow
inputs:
from-parent: '{{ outputs.invalid.value }}'
conditions:
- type: io.kestra.plugin.core.condition.ExecutionFlowCondition
namespace: io.kestra.tests.trigger
flowId: trigger-flow
# inputs:
# from_parent: "{{ trigger.outputs.parent_output }}"
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@ id: trigger-flow-listener-namespace-condition
namespace: io.kestra.tests.trigger

inputs:
- id: from-parent
- id: from_parent
type: STRING

tasks:
- id: only-listener
- id: simple_output
type: io.kestra.plugin.core.debug.Return
format: "simple return"

triggers:
- id: listen-flow
- id: parent
type: io.kestra.plugin.core.trigger.Flow
conditions:
- type: io.kestra.plugin.core.condition.ExecutionNamespaceCondition
namespace: io.kestra.tests.trigger
namespace: io.kestra.tests.trigger

outputs:
- id: parent_output
type: STRING
value: "{{ outputs.simple_output.value }}"
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@ id: trigger-flow-listener
namespace: io.kestra.tests.trigger

inputs:
- id: from-parent
- id: from_parent
type: STRING
defaults: "default value"

tasks:
- id: only-listener
type: io.kestra.plugin.core.debug.Return
format: "childs: {{inputs['from-parent']}}"
format: "childs: {{ inputs.from_parent }}"

triggers:
- id: listen-flow
type: io.kestra.plugin.core.trigger.Flow
inputs:
from-parent: '{{ outputs.only.value }}'
# inputs:
# from_parent: "{{ trigger.outputs.parent_output }}"
conditions:
- type: io.kestra.plugin.core.condition.ExecutionStatusCondition
in:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ id: trigger-multiplecondition-listener
namespace: io.kestra.tests.trigger
#
#inputs:
# - id: from-parent
# - id: from_parent
# type: STRING

tasks:
- id: only-listener
- id: return_value
type: io.kestra.plugin.core.debug.Return
format: "childs"
format: task in a child flow

triggers:
- id: multiple-listen-flow
type: io.kestra.plugin.core.trigger.Flow
# inputs:
# from-parent: '{{ outputs.only.value }}'
# from_parent: '{{ trigger.outputs.parent_output }}'
conditions:
- id: multiple
type: io.kestra.plugin.core.condition.MultipleCondition
Expand Down
Loading