Skip to content

Commit

Permalink
feat(core): refactor labels handling
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Oct 25, 2024
1 parent 2b72a82 commit 791ced8
Show file tree
Hide file tree
Showing 13 changed files with 200 additions and 152 deletions.
62 changes: 5 additions & 57 deletions core/src/main/java/io/kestra/core/models/executions/Execution.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.kestra.core.runners.RunContextLogger;
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.services.LabelService;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.MapUtils;
import io.micronaut.core.annotation.Nullable;
Expand All @@ -39,8 +40,6 @@
import java.util.stream.Stream;
import java.util.zip.CRC32;

import static io.kestra.core.models.Label.SYSTEM_PREFIX;

@Builder(toBuilder = true)
@Slf4j
@Getter
Expand Down Expand Up @@ -143,10 +142,7 @@ public static Execution newExecution(final Flow flow,
.scheduleDate(scheduleDate.map(ChronoZonedDateTime::toInstant).orElse(null))
.build();

List<Label> executionLabels = new ArrayList<>();
if (flow.getLabels() != null) {
executionLabels.addAll(flow.getLabels());
}
List<Label> executionLabels = new ArrayList<>(LabelService.labelsExcludingSystem(flow));

if (labels != null) {
executionLabels.addAll(labels);
Expand All @@ -162,6 +158,8 @@ public static Execution newExecution(final Flow flow,
return execution;
}



public static class ExecutionBuilder {
void prebuild() {
this.originalId = this.id;
Expand All @@ -181,12 +179,6 @@ public Execution build() {
this.prebuild();
return super.build();
}

@Override
public ExecutionBuilder labels(List<Label> labels) {
checkForSystemLabels(labels);
return super.labels(labels);
}
}

public Execution withState(State.Type state) {
Expand All @@ -212,53 +204,9 @@ public Execution withState(State.Type state) {
);
}

/**
* This method replaces labels with new ones.
* It refuses system labels as they must be passed via the withSystemLabels method.
*/
public Execution withLabels(List<Label> labels) {
checkForSystemLabels(labels);

return new Execution(
this.tenantId,
this.id,
this.namespace,
this.flowId,
this.flowRevision,
this.taskRunList,
this.inputs,
this.outputs,
labels,
this.variables,
this.state,
this.parentId,
this.originalId,
this.trigger,
this.deleted,
this.metadata,
this.scheduleDate,
this.error
);
}

private static void checkForSystemLabels(List<Label> labels) {
if (labels != null) {
Optional<Label> first = labels.stream().filter(label -> label.key() != null && label.key().startsWith(SYSTEM_PREFIX)).findFirst();
if (first.isPresent()) {
throw new IllegalArgumentException("System labels can only be set by Kestra itself, offending label: " + first.get().key() + "=" + first.get().value());
}
}
}

/**
* This method in <b>only to be used</b> to add system labels to an execution.
* It will not replace exisiting labels but add new one (possibly duplicating).
*/
public Execution withSystemLabels(List<Label> labels) {
List<Label> newLabels = this.labels == null ? new ArrayList<>() : this.labels;
if (labels != null) {
newLabels.addAll(labels);
}
return new Execution(
this.tenantId,
this.id,
Expand All @@ -268,7 +216,7 @@ public Execution withSystemLabels(List<Label> labels) {
this.taskRunList,
this.inputs,
this.outputs,
newLabels,
labels,
this.variables,
this.state,
this.parentId,
Expand Down
14 changes: 8 additions & 6 deletions core/src/main/java/io/kestra/core/runners/ExecutableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,14 @@ public static <T extends Task & ExecutableTask<?>> SubflowExecution<?> subflowEx
);

// propagate system labels and compute correlation ID if not already existing
List<Label> systemLabels = Streams.of(currentExecution.getLabels())
List<Label> newLabels = Streams.of(currentExecution.getLabels())
.filter(label -> label.key().startsWith(Label.SYSTEM_PREFIX))
.collect(Collectors.toList());
if (systemLabels.stream().noneMatch(label -> label.key().equals(Label.CORRELATION_ID))) {
systemLabels.add(new Label(Label.CORRELATION_ID, currentExecution.getId()));
if (newLabels.stream().noneMatch(label -> label.key().equals(Label.CORRELATION_ID))) {
newLabels.add(new Label(Label.CORRELATION_ID, currentExecution.getId()));
}
if (labels != null) {
newLabels.addAll(labels);
}

FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
Expand All @@ -107,16 +110,15 @@ public static <T extends Task & ExecutableTask<?>> SubflowExecution<?> subflowEx
.newExecution(
flow,
(f, e) -> flowInputOutput.readExecutionInputs(f, e, inputs),
labels,
newLabels,
Optional.empty())
.withTrigger(ExecutionTrigger.builder()
.id(currentTask.getId())
.type(currentTask.getType())
.variables(variables)
.build()
)
.withScheduleDate(scheduleOnDate)
.withSystemLabels(systemLabels);
.withScheduleDate(scheduleOnDate);
return SubflowExecution.builder()
.parentTask(currentTask)
.parentTaskRun(currentTaskRun.withState(State.Type.RUNNING))
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.kestra.core.server.ServerConfig;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.services.LabelService;
import io.kestra.core.services.LogService;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.utils.Await;
Expand Down Expand Up @@ -361,11 +362,11 @@ private void publishTriggerExecution(WorkerTrigger workerTrigger, Optional<Execu
);
}

var flowLabels = workerTrigger.getConditionContext().getFlow().getLabels();
if (flowLabels != null) {
var flow = workerTrigger.getConditionContext().getFlow();
if (flow.getLabels() != null) {
evaluate = evaluate.map(execution -> {
List<Label> executionLabels = execution.getLabels() != null ? execution.getLabels() : new ArrayList<>();
executionLabels.addAll(flowLabels);
executionLabels.addAll(LabelService.labelsExcludingSystem(flow));
return execution.withLabels(executionLabels);
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ private void handle() {
.namespace(f.getTriggerContext().getNamespace())
.flowId(f.getTriggerContext().getFlowId())
.flowRevision(f.getFlow().getRevision())
.labels(f.getFlow().getLabels())
.labels(LabelService.labelsExcludingSystem(f.getFlow()))
.state(new State().withState(State.Type.FAILED))
.error(ExecutionError.from(ie))
.build();
Expand Down
56 changes: 56 additions & 0 deletions core/src/main/java/io/kestra/core/services/LabelService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.kestra.core.services;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.Label;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.ListUtils;

import java.util.ArrayList;
import java.util.List;

public final class LabelService {
private LabelService() {}

/**
* Return flow labels excluding system labels.
*/
public static List<Label> labelsExcludingSystem(Flow flow) {
return ListUtils.emptyOnNull(flow.getLabels()).stream().filter(label -> !label.key().startsWith(Label.SYSTEM_PREFIX)).toList();
}

/**
* Return flow labels excluding system labels concatenated with trigger labels.
*
* Trigger labels will be rendered via the run context but not flow labels.
* In case rendering is not possible, the label will be omitted.
*/
public static List<Label> fromTrigger(RunContext runContext, Flow flow, AbstractTrigger trigger) {
final List<Label> labels = new ArrayList<>();

if (flow.getLabels() != null) {
labels.addAll(LabelService.labelsExcludingSystem(flow)); // no need for rendering
}

if (trigger.getLabels() != null) {
for (Label label : trigger.getLabels()) {
final var value = renderLabelValue(runContext, label);
if (value != null) {
labels.add(new Label(label.key(), value));
}
}
}

return labels;
}

private static String renderLabelValue(RunContext runContext, Label label) {
try {
return runContext.render(label.value());
} catch (IllegalVariableEvaluationException e) {
runContext.logger().warn("Failed to render label '{}', it will be omitted", label.key(), e);
return null;
}
}
}
14 changes: 11 additions & 3 deletions core/src/main/java/io/kestra/plugin/core/execution/Labels.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static io.kestra.core.models.Label.SYSTEM_PREFIX;
import static io.kestra.core.utils.Rethrow.throwBiConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;

Expand All @@ -35,7 +36,8 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Allow to add or overwrite labels for the current execution at runtime."
title = "Allow to add or overwrite labels for the current execution at runtime.",
description = "Trying to pass a system label (a label starting with `system_`) will fail the task."
)
@Plugin(
examples = {
Expand Down Expand Up @@ -131,11 +133,17 @@ public Execution update(Execution execution, RunContext runContext) throws Excep
);
}));

// check for system labels: none can be passed at runtime
Optional<Map.Entry<String, String>> first = newLabels.entrySet().stream().filter(entry -> entry.getKey().startsWith(SYSTEM_PREFIX)).findFirst();
if (first.isPresent()) {
throw new IllegalArgumentException("System labels can only be set by Kestra itself, offending label: " + first.get().getKey() + "=" + first.get().getValue());
}

return execution.withLabels(newLabels.entrySet().stream()
.map(throwFunction(entry -> new Label(
.map(entry -> new Label(
entry.getKey(),
entry.getValue()
)))
))
.toList());
} else {
throw new IllegalVariableEvaluationException("Unknown value type: " + labels.getClass());
Expand Down
35 changes: 4 additions & 31 deletions core/src/main/java/io/kestra/plugin/core/trigger/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.services.LabelService;
import io.swagger.v3.oas.annotations.media.Schema;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -60,7 +61,7 @@
outputs:
- id: last_ingested_date
type: STRING
value: "{{ outputs.final_date.value }}"
value: "{{ outputs.final_date.value }}"
```
Below is the `transform` flow triggered in response to the `extract` flow's successful completion.""",
code = """
Expand All @@ -74,7 +75,7 @@
variables:
result: |
Ingestion done in {{ trigger.executionId }}.
Ingestion done in {{ trigger.executionId }}.
Now transforming data up to {{ inputs.last_ingested_date }}
tasks:
Expand Down Expand Up @@ -144,7 +145,7 @@ public Optional<Execution> evaluate(RunContext runContext, io.kestra.core.models
.namespace(flow.getNamespace())
.flowId(flow.getId())
.flowRevision(flow.getRevision())
.labels(generateLabels(runContext, flow))
.labels(LabelService.fromTrigger(runContext, flow, this))
.state(new State())
.trigger(ExecutionTrigger.of(
this,
Expand Down Expand Up @@ -181,34 +182,6 @@ public Optional<Execution> evaluate(RunContext runContext, io.kestra.core.models
}
}

private List<Label> generateLabels(RunContext runContext, io.kestra.core.models.flows.Flow flow) {
final List<Label> labels = new ArrayList<>();

if (flow.getLabels() != null) {
labels.addAll(flow.getLabels()); // no need for rendering
}

if (this.getLabels() != null) {
for (Label label : this.getLabels()) {
final var value = renderLabelValue(runContext, label);
if (value != null) {
labels.add(new Label(label.key(), value));
}
}
}

return labels;
}

private String renderLabelValue(RunContext runContext, Label label) {
try {
return runContext.render(label.value());
} catch (IllegalVariableEvaluationException e) {
runContext.logger().warn("Failed to render label '{}', it will be omitted", label.key(), e);
return null;
}
}

@Builder
@ToString
@EqualsAndHashCode
Expand Down
16 changes: 2 additions & 14 deletions core/src/main/java/io/kestra/plugin/core/trigger/Schedule.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.LabelService;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.validations.ScheduleValidation;
import io.kestra.core.validations.TimezoneId;
Expand Down Expand Up @@ -425,11 +426,7 @@ public Cron parseCron() {
}

private List<Label> generateLabels(RunContext runContext, ConditionContext conditionContext, Backfill backfill) throws IllegalVariableEvaluationException {
List<Label> labels = new ArrayList<>();

if (conditionContext.getFlow().getLabels() != null) {
labels.addAll(conditionContext.getFlow().getLabels()); // no need for rendering
}
List<Label> labels = LabelService.fromTrigger(runContext, conditionContext.getFlow(), this);

if (backfill != null && backfill.getLabels() != null) {
for (Label label : backfill.getLabels()) {
Expand All @@ -440,15 +437,6 @@ private List<Label> generateLabels(RunContext runContext, ConditionContext condi
}
}

if (this.getLabels() != null) {
for (Label label : this.getLabels()) {
final var value = runContext.render(label.value());
if (value != null) {
labels.add(new Label(label.key(), value));
}
}
}

return labels;
}

Expand Down
Loading

0 comments on commit 791ced8

Please sign in to comment.