Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core-ee): change Objects.equals for tenant id to prevent NPE #6411

Merged
merged 5 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 77 additions & 52 deletions core/src/main/java/io/kestra/core/models/executions/Execution.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@
@Builder(toBuilder = true)
@Slf4j
@Getter
@FieldDefaults(makeFinal=true, level=AccessLevel. PRIVATE)
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
@AllArgsConstructor
@ToString
@EqualsAndHashCode
public class Execution implements DeletedInterface, TenantInterface {

@With
@Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
Expand Down Expand Up @@ -108,7 +109,7 @@ public class Execution implements DeletedInterface, TenantInterface {
/**
* Factory method for constructing a new {@link Execution} object for the given {@link Flow}.
*
* @param flow The Flow.
* @param flow The Flow.
* @param labels The Flow labels.
* @return a new {@link Execution}.
*/
Expand All @@ -117,17 +118,18 @@ public static Execution newExecution(final Flow flow, final List<Label> labels)
}

/**
* Factory method for constructing a new {@link Execution} object for the given {@link Flow} and inputs.
* Factory method for constructing a new {@link Execution} object for the given {@link Flow} and
* inputs.
*
* @param flow The Flow.
* @param flow The Flow.
* @param inputs The Flow's inputs.
* @param labels The Flow labels.
* @return a new {@link Execution}.
*/
public static Execution newExecution(final Flow flow,
final BiFunction<Flow, Execution, Map<String, Object>> inputs,
final List<Label> labels,
final Optional<ZonedDateTime> scheduleDate) {
final BiFunction<Flow, Execution, Map<String, Object>> inputs,
final List<Label> labels,
final Optional<ZonedDateTime> scheduleDate) {
Execution execution = builder()
.id(IdUtils.create())
.tenantId(flow.getTenantId())
Expand Down Expand Up @@ -156,8 +158,8 @@ public static Execution newExecution(final Flow flow,
}



public static class ExecutionBuilder {

void prebuild() {
this.originalId = this.id;
this.metadata = ExecutionMetadata.builder()
Expand All @@ -171,6 +173,7 @@ public static ExecutionBuilder builder() {
}

private static class CustomExecutionBuilder extends ExecutionBuilder {

@Override
public Execution build() {
this.prebuild();
Expand Down Expand Up @@ -202,7 +205,6 @@ public Execution withState(State.Type state) {

public Execution withLabels(List<Label> labels) {


return new Execution(
this.tenantId,
this.id,
Expand Down Expand Up @@ -234,7 +236,9 @@ public Execution withTaskRun(TaskRun taskRun) throws InternalException {
);

if (!b) {
throw new IllegalStateException("Can't replace taskRun '" + taskRun.getId() + "' on execution'" + this.getId() + "'");
throw new IllegalStateException(
"Can't replace taskRun '" + taskRun.getId() + "' on execution'" + this.getId()
+ "'");
}

return new Execution(
Expand All @@ -258,7 +262,8 @@ public Execution withTaskRun(TaskRun taskRun) throws InternalException {
);
}

public Execution childExecution(String childExecutionId, List<TaskRun> taskRunList, State state) {
public Execution childExecution(String childExecutionId, List<TaskRun> taskRunList,
State state) {
return new Execution(
this.tenantId,
childExecutionId != null ? childExecutionId : this.getId(),
Expand Down Expand Up @@ -292,55 +297,63 @@ public List<TaskRun> findTaskRunsByTaskId(String id) {
}

public TaskRun findTaskRunByTaskRunId(String id) throws InternalException {
Optional<TaskRun> find = (this.taskRunList == null ? Collections.<TaskRun>emptyList() : this.taskRunList)
Optional<TaskRun> find = (this.taskRunList == null ? Collections.<TaskRun>emptyList()
: this.taskRunList)
.stream()
.filter(taskRun -> taskRun.getId().equals(id))
.findFirst();

if (find.isEmpty()) {
throw new InternalException("Can't find taskrun with taskrunId '" + id + "' on execution '" + this.id + "' " + this.toStringState());
throw new InternalException(
"Can't find taskrun with taskrunId '" + id + "' on execution '" + this.id + "' "
+ this.toStringState());
}

return find.get();
}

public TaskRun findTaskRunByTaskIdAndValue(String id, List<String> values) throws InternalException {
Optional<TaskRun> find = (this.taskRunList == null ? Collections.<TaskRun>emptyList() : this.taskRunList)
public TaskRun findTaskRunByTaskIdAndValue(String id, List<String> values)
throws InternalException {
Optional<TaskRun> find = (this.taskRunList == null ? Collections.<TaskRun>emptyList()
: this.taskRunList)
.stream()
.filter(taskRun -> taskRun.getTaskId().equals(id) && findParentsValues(taskRun, true).equals(values))
.filter(taskRun -> taskRun.getTaskId().equals(id) && findParentsValues(taskRun,
true).equals(values))
.findFirst();

if (find.isEmpty()) {
throw new InternalException("Can't find taskrun with taskrunId '" + id + "' & value '" + values + "' on execution '" + this.id + "' " + this.toStringState());
throw new InternalException(
"Can't find taskrun with taskrunId '" + id + "' & value '" + values
+ "' on execution '" + this.id + "' " + this.toStringState());
}

return find.get();
}

/**
* Determine if the current execution is on error &amp; normal tasks
* Used only from the flow
* Determine if the current execution is on error &amp; normal tasks Used only from the flow
*
* @param resolvedTasks normal tasks
* @param resolvedTasks normal tasks
* @param resolvedErrors errors tasks
* @return the flow we need to follow
*/
public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolvedTasks, List<ResolvedTask> resolvedErrors) {
public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolvedTasks,
List<ResolvedTask> resolvedErrors) {
return this.findTaskDependingFlowState(resolvedTasks, resolvedErrors, null);
}

/**
* Determine if the current execution is on error &amp; normal tasks
* <p>
* if the current have errors, return tasks from errors
* if not, return the normal tasks
* if the current have errors, return tasks from errors if not, return the normal tasks
*
* @param resolvedTasks normal tasks
* @param resolvedTasks normal tasks
* @param resolvedErrors errors tasks
* @param parentTaskRun the parent task
* @param parentTaskRun the parent task
* @return the flow we need to follow
*/
public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolvedTasks, @Nullable List<ResolvedTask> resolvedErrors, TaskRun parentTaskRun) {
public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolvedTasks,
@Nullable List<ResolvedTask> resolvedErrors, TaskRun parentTaskRun) {
resolvedTasks = removeDisabled(resolvedTasks);
resolvedErrors = removeDisabled(resolvedErrors);

Expand Down Expand Up @@ -377,7 +390,8 @@ private List<ResolvedTask> removeDisabled(List<ResolvedTask> tasks) {
.toList();
}

public List<TaskRun> findTaskRunByTasks(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun) {
public List<TaskRun> findTaskRunByTasks(List<ResolvedTask> resolvedTasks,
TaskRun parentTaskRun) {
if (resolvedTasks == null || this.taskRunList == null) {
return Collections.emptyList();
}
Expand All @@ -387,7 +401,8 @@ public List<TaskRun> findTaskRunByTasks(List<ResolvedTask> resolvedTasks, TaskRu
.stream()
.filter(t -> resolvedTasks
.stream()
.anyMatch(resolvedTask -> FlowableUtils.isTaskRunFor(resolvedTask, t, parentTaskRun))
.anyMatch(
resolvedTask -> FlowableUtils.isTaskRunFor(resolvedTask, t, parentTaskRun))
)
.toList();
}
Expand Down Expand Up @@ -503,12 +518,16 @@ public boolean hasFailedNoRetry(List<ResolvedTask> resolvedTasks, TaskRun parent
return this.findTaskRunByTasks(resolvedTasks, parentTaskRun)
.stream()
.anyMatch(taskRun -> {
ResolvedTask resolvedTask = resolvedTasks.stream().filter(t -> t.getTask().getId().equals(taskRun.getTaskId())).findFirst().orElse(null);
ResolvedTask resolvedTask = resolvedTasks.stream()
.filter(t -> t.getTask().getId().equals(taskRun.getTaskId())).findFirst()
.orElse(null);
if (resolvedTask == null) {
log.warn("Can't find task for taskRun '{}' in parentTaskRun '{}'", taskRun.getId(), parentTaskRun.getId());
log.warn("Can't find task for taskRun '{}' in parentTaskRun '{}'",
taskRun.getId(), parentTaskRun.getId());
return false;
}
return !taskRun.shouldBeRetried(resolvedTask.getTask().getRetry()) && taskRun.getState().isFailed();
return !taskRun.shouldBeRetried(resolvedTask.getTask().getRetry())
&& taskRun.getState().isFailed();
});
}

Expand Down Expand Up @@ -542,7 +561,8 @@ public State.Type guessFinalState(Flow flow) {
return this.guessFinalState(ResolvedTask.of(flow.getTasks()), null, false, false);
}

public State.Type guessFinalState(List<ResolvedTask> currentTasks, TaskRun parentTaskRun, boolean allowFailure, boolean allowWarning) {
public State.Type guessFinalState(List<ResolvedTask> currentTasks, TaskRun parentTaskRun,
boolean allowFailure, boolean allowWarning) {
List<TaskRun> taskRuns = this.findTaskRunByTasks(currentTasks, parentTaskRun);
var state = this
.findLastByState(taskRuns, State.Type.KILLED)
Expand Down Expand Up @@ -592,7 +612,8 @@ public boolean hasTaskRunJoinable(TaskRun taskRun) {
// attempts & retry need to be saved
if (
(current.getAttempts() == null && taskRun.getAttempts() != null) ||
(current.getAttempts() != null && taskRun.getAttempts() != null && current.getAttempts().size() < taskRun.getAttempts().size())
(current.getAttempts() != null && taskRun.getAttempts() != null
&& current.getAttempts().size() < taskRun.getAttempts().size())
) {
return true;
}
Expand Down Expand Up @@ -620,11 +641,10 @@ public boolean hasTaskRunJoinable(TaskRun taskRun) {
}

/**
* Convert an exception on Executor and add log to the current
* {@code RUNNING} taskRun, on the lastAttempts.
* If no Attempt is found, we create one (must be nominal case).
* The executor will catch the {@code FAILED} taskRun emitted and will failed the execution.
* In the worst case, we FAILED the execution (only from {@link io.kestra.plugin.core.trigger.Flow}).
* Convert an exception on Executor and add log to the current {@code RUNNING} taskRun, on the
* lastAttempts. If no Attempt is found, we create one (must be nominal case). The executor will
* catch the {@code FAILED} taskRun emitted and will failed the execution. In the worst case, we
* FAILED the execution (only from {@link io.kestra.plugin.core.trigger.Flow}).
*
* @param e the exception throw from Executor
* @return a new execution with taskrun failed if possible or execution failed is other case
Expand Down Expand Up @@ -663,7 +683,8 @@ public FailedExecutionWithLog failedExecutionFromExecutor(Exception e) {
}
})
.orElseGet(() -> new FailedExecutionWithLog(
this.state.getCurrent() != State.Type.FAILED ? this.withState(State.Type.FAILED) : this,
this.state.getCurrent() != State.Type.FAILED ? this.withState(State.Type.FAILED)
: this,
RunContextLogger.logEntries(loggingEventFromException(e), LogEntry.of(this))
)
);
Expand All @@ -673,10 +694,11 @@ public FailedExecutionWithLog failedExecutionFromExecutor(Exception e) {
* Create a new attempt for failed worker execution
*
* @param taskRun the task run where we need to add an attempt
* @param e the exception raise
* @param e the exception raise
* @return new taskRun with added attempt
*/
private static FailedTaskRunWithLog newAttemptsTaskRunForFailedExecution(TaskRun taskRun, Exception e) {
private static FailedTaskRunWithLog newAttemptsTaskRunForFailedExecution(TaskRun taskRun,
Exception e) {
return new FailedTaskRunWithLog(
taskRun
.withAttempts(
Expand All @@ -693,12 +715,13 @@ private static FailedTaskRunWithLog newAttemptsTaskRunForFailedExecution(TaskRun
/**
* Add exception log to last attempts
*
* @param taskRun the task run where we need to add an attempt
* @param taskRun the task run where we need to add an attempt
* @param lastAttempt the lastAttempt found to add
* @param e the exception raise
* @param e the exception raise
* @return new taskRun with updated attempt with logs
*/
private static FailedTaskRunWithLog lastAttemptsTaskRunForFailedExecution(TaskRun taskRun, TaskRunAttempt lastAttempt, Exception e) {
private static FailedTaskRunWithLog lastAttemptsTaskRunForFailedExecution(TaskRun taskRun,
TaskRunAttempt lastAttempt, Exception e) {
return new FailedTaskRunWithLog(
taskRun
.withAttempts(
Expand All @@ -717,13 +740,15 @@ private static FailedTaskRunWithLog lastAttemptsTaskRunForFailedExecution(TaskRu

@Value
public static class FailedTaskRunWithLog {

private TaskRun taskRun;
private List<LogEntry> logs;
}

@Value
@Builder
public static class FailedExecutionWithLog {

private Execution execution;
private List<LogEntry> logs;
}
Expand Down Expand Up @@ -777,7 +802,8 @@ private Map<String, Object> outputs(TaskRun taskRun, Map<String, TaskRun> byIds)
if (taskRun.getValue() == null) {
return Map.of(taskRun.getTaskId(), taskRun.getOutputs());
} else {
return Map.of(taskRun.getTaskId(), Map.of(taskRun.getValue(), taskRun.getOutputs()));
return Map.of(taskRun.getTaskId(),
Map.of(taskRun.getValue(), taskRun.getOutputs()));
}
}

Expand Down Expand Up @@ -828,10 +854,9 @@ public List<Map<String, Object>> parents(TaskRun taskRun) {
}

/**
* Find all parents from this {@link TaskRun}.
* The list is starting from deeper parent and end on the closest parent,
* so the first element is the task that starts first.
* This method doesn't return the current tasks.
* Find all parents from this {@link TaskRun}. The list is starting from deeper parent and end
* on the closest parent, so the first element is the task that starts first. This method
* doesn't return the current tasks.
*
* @param taskRun current child
* @return List of parent {@link TaskRun}
Expand Down Expand Up @@ -864,9 +889,9 @@ public List<TaskRun> findParents(TaskRun taskRun) {
}

/**
* Find all parents from this {@link TaskRun}.
* This method does the same as #findParents(TaskRun taskRun) but for performance reason, as it's called a lot,
* we pre-compute the map of taskrun by ID and use it here.
* Find all parents from this {@link TaskRun}. This method does the same as #findParents(TaskRun
* taskRun) but for performance reason, as it's called a lot, we pre-compute the map of taskrun
* by ID and use it here.
*/
private List<TaskRun> findParents(TaskRun taskRun, Map<String, TaskRun> taskRunById) {
if (taskRun.getParentTaskRunId() == null || taskRunById.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.utils.IdUtils;
import jakarta.validation.constraints.NotNull;
import java.util.Objects;
import lombok.*;
import lombok.experimental.SuperBuilder;

Expand All @@ -30,7 +30,7 @@ public class ExecutionKilledTrigger extends ExecutionKilled implements TenantInt
String triggerId;

public boolean isEqual(TriggerContext triggerContext) {
return (triggerContext.getTenantId() == null || triggerContext.getTenantId().equals(this.tenantId)) &&
return (triggerContext.getTenantId() == null || Objects.equals(triggerContext.getTenantId(), this.tenantId)) &&
triggerContext.getNamespace().equals(this.namespace) &&
triggerContext.getFlowId().equals(this.flowId) &&
triggerContext.getTriggerId().equals(this.triggerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowListenersInterface;
import jakarta.inject.Singleton;
import java.util.Objects;
import lombok.Setter;

import java.util.Collection;
Expand Down Expand Up @@ -32,7 +33,7 @@ public Collection<FlowWithSource> allLastVersion() {
public Optional<FlowWithSource> findById(String tenantId, String namespace, String id, Optional<Integer> revision) {
Optional<FlowWithSource> find = this.allFlows
.stream()
.filter(flow -> ((flow.getTenantId() == null && tenantId == null) || flow.getTenantId().equals(tenantId)) &&
.filter(flow -> ((flow.getTenantId() == null && tenantId == null) || Objects.equals(flow.getTenantId(), tenantId)) &&
flow.getNamespace().equals(namespace) &&
flow.getId().equals(id) &&
(revision.isEmpty() || revision.get().equals(flow.getRevision()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void failed() throws InterruptedException, TimeoutException, QueueExcepti
});

// first one
Execution execution = runnerUtils.runOne(null, "io.kestra.tests.trigger", "trigger-multiplecondition-flow-c", Duration.ofSeconds(60));
Execution execution = runnerUtils.runOne(null, "io.kestra.tests.trigger.fail", "trigger-multiplecondition-flow-c", Duration.ofSeconds(60));
assertThat(execution.getTaskRunList().size(), is(1));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));

Expand All @@ -114,7 +114,7 @@ public void failed() throws InterruptedException, TimeoutException, QueueExcepti
assertThat(listener.get(), nullValue());

// second one
execution = runnerUtils.runOne(null, "io.kestra.tests.trigger", "trigger-multiplecondition-flow-d", Duration.ofSeconds(60));
execution = runnerUtils.runOne(null, "io.kestra.tests.trigger.fail", "trigger-multiplecondition-flow-d", Duration.ofSeconds(60));
assertThat(execution.getTaskRunList().size(), is(1));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

Expand Down
Loading
Loading