Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

debug run one task when submit #89

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public interface DAGInteraction {
/**
* 提交需执行的DAG任务
*/
void submit(String executionId, DAG dag, Map<String, Object> data, DAGSettings settings, NotifyInfo notifyInfo);
void submit(String executionId, String taskName, DAG dag, Map<String, Object> data, DAGSettings settings, NotifyInfo notifyInfo);

/**
* 完成task后调用接口
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

@Slf4j
public class DAGOperations {
public class DAGOperations implements DAGOperationsInterface {
private static final String EXECUTION_ID = "executionId";

private final ExecutorService runnerExecutor;
Expand All @@ -63,20 +62,6 @@ public class DAGOperations {
private final DAGResultHandler dagResultHandler;


public static final BiConsumer<Runnable, Integer> OPERATE_WITH_RETRY = (operation, retryTimes) -> {
int exceptionCatchTimes = retryTimes;
for (int i = 1; i <= exceptionCatchTimes; i++) {
try {
operation.run();
return;
} catch (Exception e) {
log.warn("operateWithRetry fails, invokeTimes:{}", i, e);
}
}

operation.run();
};

public DAGOperations(ExecutorService runnerExecutor, Map<String, TaskRunner> taskRunners, DAGRunner dagRunner,
TimeCheckRunner timeCheckRunner, DAGTraversal dagTraversal, Callback<DAGCallbackInfo> callback,
DAGResultHandler dagResultHandler) {
Expand Down Expand Up @@ -231,9 +216,9 @@ public void redoTask(String executionId, List<String> taskNames, Map<String, Obj
dagTraversal.submitTraversal(executionId, null);
}

public void submitDAG(String executionId, DAG dag, DAGSettings settings, Map<String, Object> data, NotifyInfo notifyInfo) {
public void submitDAG(String executionId, String taskName, DAG dag, DAGSettings settings, Map<String, Object> data, NotifyInfo notifyInfo) {
log.info("submitDAG task begin to execute executionId:{} notifyInfo:{}", executionId, notifyInfo);
ExecutionResult executionResult = dagRunner.submitDAG(executionId, dag, settings, data, notifyInfo);
ExecutionResult executionResult = dagRunner.submitDAG(executionId, taskName, dag, settings, data, notifyInfo);
Optional.ofNullable(getTimeoutSeconds(new HashMap<>(), executionResult.getContext(), dag.getTimeline()))
.ifPresent(timeoutSeconds -> timeCheckRunner.addDAGToTimeoutCheck(executionId, timeoutSeconds));
dagTraversal.submitTraversal(executionId, null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.weibo.rill.flow.olympicene.traversal;

import com.weibo.rill.flow.interfaces.model.task.TaskInfo;
import com.weibo.rill.flow.olympicene.core.model.NotifyInfo;
import com.weibo.rill.flow.olympicene.core.model.dag.DAGInfo;
import com.weibo.rill.flow.olympicene.core.model.dag.DAGInvokeMsg;
import com.weibo.rill.flow.olympicene.core.model.dag.DAGStatus;
import org.apache.commons.lang3.tuple.Pair;

import java.util.Collection;
import java.util.Map;

public interface DAGOperationsInterface {
void finishDAG(String executionId, DAGInfo dagInfo, DAGStatus dagStatus, DAGInvokeMsg dagInvokeMsg);
void finishTaskAsync(String executionId, String taskCategory, NotifyInfo notifyInfo, Map<String, Object> output);
void finishTaskSync(String executionId, String taskCategory, NotifyInfo notifyInfo, Map<String, Object> output);
void runTasks(String executionId, Collection<Pair<TaskInfo, Map<String, Object>>> taskInfoToContexts);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.weibo.rill.flow.olympicene.traversal.helper.ContextHelper;
import com.weibo.rill.flow.olympicene.traversal.helper.PluginHelper;
import com.weibo.rill.flow.olympicene.traversal.helper.Stasher;
import com.weibo.rill.flow.olympicene.traversal.utils.OperationUtil;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
Expand All @@ -56,7 +57,7 @@ public class DAGTraversal {
private final DAGStorageProcedure dagStorageProcedure;
private final ExecutorService traversalExecutor;
@Setter
private DAGOperations dagOperations;
private DAGOperationsInterface dagOperations;
@Setter
private Stasher stasher;

Expand All @@ -80,7 +81,7 @@ public void submitTraversal(String executionId, String completedTaskName) {
Runnable basicActions = () -> dagStorageProcedure.lockAndRun(
LockerKey.buildDagInfoLockName(executionId), () -> doTraversal(executionId, completedTaskName));
Runnable runnable = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TRAVERSAL_CUSTOMIZED_PLUGINS);
DAGOperations.OPERATE_WITH_RETRY.accept(runnable, SystemConfig.getTraversalRetryTimes());
OperationUtil.OPERATE_WITH_RETRY.accept(runnable, SystemConfig.getTraversalRetryTimes());
} catch (Exception e) {
log.error("executionId:{} traversal exception with completedTaskName:{}. ", executionId, completedTaskName, e);
}
Expand All @@ -99,7 +100,7 @@ public void submitTasks(String executionId, Set<TaskInfo> taskInfos, Map<String,
runTasks(executionId, taskToContexts);
}
});
DAGOperations.OPERATE_WITH_RETRY.accept(runnable, SystemConfig.getTraversalRetryTimes());
OperationUtil.OPERATE_WITH_RETRY.accept(runnable, SystemConfig.getTraversalRetryTimes());
} catch (Exception e) {
log.error("dag {} traversal exception with tasks {}. ", executionId, Joiner.on(",").join(taskInfos.stream().map(TaskInfo::getName).collect(Collectors.toList())), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,16 @@ public Olympicene(DAGInfoStorage dagInfoStorage, DAGOperations dagOperations, Ex
* 提交需执行的DAG任务
*/
public void submit(String executionId, DAG dag, Map<String, Object> data) {
submit(executionId, dag, data, DAGSettings.DEFAULT, null);
submit(executionId, null, dag, data, DAGSettings.DEFAULT, null);
}

/**
* 提交需执行的DAG任务
*/
@Override
public void submit(String executionId, DAG dag, Map<String, Object> data, DAGSettings settings, NotifyInfo notifyInfo) {
public void submit(String executionId, String taskName, DAG dag, Map<String, Object> data, DAGSettings settings, NotifyInfo notifyInfo) {
runNotify(executionId, NotifyType.SUBMIT, notifyInfo,
() -> dagOperations.submitDAG(executionId, dag, settings, data, notifyInfo));
() -> dagOperations.submitDAG(executionId, taskName, dag, settings, data, notifyInfo));
}

public void runNotify(String executionId, NotifyType notifyType, NotifyInfo notifyInfo, Runnable actions) {
Expand Down Expand Up @@ -186,7 +186,7 @@ public DAGResult run(String executionId, DAG dag, Map<String, Object> data, DAGS

dagResultHandler.initEnv(executionId);
doRunNotify(executionId, NotifyType.RUN, notifyInfo,
() -> dagOperations.submitDAG(executionId, dag, settings, data, notifyInfo));
() -> dagOperations.submitDAG(executionId, null, dag, settings, data, notifyInfo));
return dagResultHandler.getDAGResult(executionId, timeoutInMillisecond);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public DAGRunner(DAGContextStorage dagContextStorage, DAGInfoStorage dagInfoStor
this.dagStorageProcedure = dagStorageProcedure;
}

public ExecutionResult submitDAG(String executionId, DAG dag, DAGSettings settings, Map<String, Object> data, NotifyInfo notifyInfo) {
ExecutionResult ret = ExecutionResult.builder().build();
public ExecutionResult submitDAG(String executionId, String taskName, DAG dag, DAGSettings settings, Map<String, Object> data, NotifyInfo notifyInfo) {
ExecutionResult ret = ExecutionResult.builder().needRetry(false).retryIntervalInSeconds(0).build();

dagStorageProcedure.lockAndRun(LockerKey.buildDagInfoLockName(executionId), () -> {
DAGInfo currentExecutionIdDagInfo = dagInfoStorage.getBasicDAGInfo(executionId);
Expand Down Expand Up @@ -104,6 +104,15 @@ public ExecutionResult submitDAG(String executionId, DAG dag, DAGSettings settin
.dagInvokeMsg(dagInvokeMsg)
.dagStatus(DAGStatus.RUNNING)
.make();

if (MapUtils.isNotEmpty(dagInfoToUpdate.getTasks()) && taskName != null) {
for (Map.Entry<String, TaskInfo> taskInfoEntry : dagInfoToUpdate.getTasks().entrySet()) {
if (!taskInfoEntry.getKey().equals(taskName)) {
taskInfoEntry.getValue().setTaskStatus(TaskStatus.SKIPPED);
}
}
}

ret.setDagInfo(dagInfoToUpdate);
Optional.ofNullable(dagInvokeMsg)
.map(DAGInvokeMsg::getExecutionRoutes)
Expand Down Expand Up @@ -259,7 +268,7 @@ public ExecutionResult finishDAG(String executionId, DAGInfo dagInfo, DAGStatus
}

log.info("finishDAG finish, executionId:{}", executionId);
return ExecutionResult.builder().dagInfo(wholeDagInfo).context(context).build();
return ExecutionResult.builder().dagInfo(wholeDagInfo).context(context).needRetry(false).retryIntervalInSeconds(0).build();
}

private void updateDAGInvokeStartTime(DAGInfo dagInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
import com.weibo.rill.flow.olympicene.core.runtime.DAGInfoStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure;
import com.weibo.rill.flow.olympicene.traversal.DAGOperations;
import com.weibo.rill.flow.olympicene.traversal.DAGOperationsInterface;
import com.weibo.rill.flow.olympicene.traversal.checker.TimeCheckMember;
import com.weibo.rill.flow.olympicene.traversal.checker.TimeChecker;
import com.weibo.rill.flow.olympicene.traversal.helper.ContextHelper;
import com.weibo.rill.flow.olympicene.traversal.serialize.DAGTraversalSerializer;
import com.weibo.rill.flow.olympicene.traversal.utils.OperationUtil;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -54,7 +56,7 @@
private final DAGInfoStorage dagInfoStorage;
private final DAGContextStorage dagContextStorage;
@Setter
private DAGOperations dagOperations;
private DAGOperationsInterface dagOperations;

public TimeCheckRunner(TimeChecker timeChecker, DAGInfoStorage dagInfoStorage, DAGContextStorage dagContextStorage,
DAGStorageProcedure dagStorageProcedure) {
Expand Down Expand Up @@ -94,7 +96,7 @@
Map<String, Object> context = ContextHelper.getInstance().getContext(dagContextStorage, executionId, taskInfo);
dagOperations.runTasks(executionId, Lists.newArrayList(Pair.of(taskInfo, context)));
};
DAGOperations.OPERATE_WITH_RETRY.accept(operations, SystemConfig.getTimerRetryTimes());
OperationUtil.OPERATE_WITH_RETRY.accept(operations, SystemConfig.getTimerRetryTimes());

Check warning on line 99 in rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/TimeCheckRunner.java

View check run for this annotation

Codecov / codecov/patch

rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/TimeCheckRunner.java#L99

Added line #L99 was not covered by tests
break;
default:
log.warn("handleTimeCheck time check type nonsupport, type:{}", type);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.weibo.rill.flow.olympicene.traversal.utils;

import lombok.extern.slf4j.Slf4j;

import java.util.function.ObjIntConsumer;

@Slf4j
public class OperationUtil {
private OperationUtil() {}

public static final ObjIntConsumer<Runnable> OPERATE_WITH_RETRY = (operation, retryTimes) -> {
for (int i = 1; i <= retryTimes; i++) {
try {
operation.run();
return;
} catch (Exception e) {
log.warn("operateWithRetry fails, invokeTimes:{}", i, e);
}
}

operation.run();
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class InvokeMsgTest extends Specification {
.parentDAGTaskInfoName("A")
.parentDAGTaskExecutionType(FunctionPattern.FLOW_SYNC)
.build()
olympicene.submit("smallFlow", smallFlow, [:], DAGSettings.DEFAULT, smallFlowSubmit)
olympicene.submit("smallFlow", null, smallFlow, [:], DAGSettings.DEFAULT, smallFlowSubmit)

NotifyInfo smallFlowNotify = NotifyInfo.builder()
.taskInfoName("B_0-C")
Expand Down Expand Up @@ -130,4 +130,35 @@ class InvokeMsgTest extends Specification {
((DAGCallbackInfo) event.getData()).getDagInfo().getTask("A").getTaskInvokeMsg().getOutput() == ['flow_root_execution_id':'bigFlow', 'segments':['gopUrl']]
})
}

def "test submit for debug"() {
given:
String flowYaml = "workspace: default\n" +
"dagName: testSubmit\n" +
"alias: release\n" +
"type: flow\n" +
"inputSchema: '[]'\n" +
"tasks:\n" +
" - next: pass1\n" +
" name: pass0\n" +
" category: pass\n" +
" - next: pass2\n" +
" name: pass1\n" +
" category: pass\n" +
" - name: pass2\n" +
" category: pass\n"
DAG testFlow = dagParser.parse(flowYaml)
dispatcher.dispatch(*_) >> '{"execution_id":"testFlow"}'

when:
olympicene.submit("testFlow", "pass1", testFlow, [:], DAGSettings.DEFAULT, null)

then:
1 * callback.onEvent({Event event ->
event.eventCode == DAGEvent.DAG_SUCCEED.getCode() &&
((DAGCallbackInfo) event.getData()).getDagInfo().getTasks().get("pass0").getTaskStatus() == TaskStatus.SKIPPED &&
((DAGCallbackInfo) event.getData()).getDagInfo().getTasks().get("pass1").getTaskStatus() == TaskStatus.SUCCEED &&
((DAGCallbackInfo) event.getData()).getDagInfo().getTasks().get("pass2").getTaskStatus() == TaskStatus.SKIPPED
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class MultiDAGTest extends Specification {
NotifyInfo notifyInfo = NotifyInfo.builder()
.parentDAGExecutionId("level1")
.parentDAGTaskInfoName("A").build()
olympicene.submit("level2", dag, [:], DAGSettings.DEFAULT, notifyInfo)
olympicene.submit("level2", null, dag, [:], DAGSettings.DEFAULT, notifyInfo)
TaskInfo level1TaskA = dagStorage.getDAGInfo("level1").getTask("A")
DAGInfo level2 = dagStorage.getDAGInfo("level2")

Expand All @@ -87,12 +87,12 @@ class MultiDAGTest extends Specification {
NotifyInfo notifyInfoLevel2 = NotifyInfo.builder()
.parentDAGExecutionId("level1")
.parentDAGTaskInfoName("A").build()
olympicene.submit("level2", dag, [:], dagSettings, notifyInfoLevel2)
olympicene.submit("level2", null, dag, [:], dagSettings, notifyInfoLevel2)

NotifyInfo notifyInfoLevel3 = NotifyInfo.builder()
.parentDAGExecutionId("level2")
.parentDAGTaskInfoName("A").build()
olympicene.submit("level3", dag, [:], dagSettings, notifyInfoLevel3)
olympicene.submit("level3", null, dag, [:], dagSettings, notifyInfoLevel3)

then:
def e = thrown(DAGTraversalException)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.weibo.rill.flow.olympicene.traversal.utils

import spock.lang.Specification
import java.util.concurrent.atomic.AtomicInteger

class OperationUtilTest extends Specification {
def "test OPERATE_WITH_RETRY with retries"() {
given:
AtomicInteger counter = new AtomicInteger(0)
Runnable operation = {
if (counter.incrementAndGet() < 3) {
throw new RuntimeException("Operation failed")
}
}

when:
OperationUtil.OPERATE_WITH_RETRY.accept(operation, 3)

then:
counter.get() == 3
}

def "test OPERATE_WITH_RETRY with zero retries"() {
given:
AtomicInteger counter = new AtomicInteger(0)
Runnable operation = {
counter.incrementAndGet()
}

when:
OperationUtil.OPERATE_WITH_RETRY.accept(operation, 0)

then:
counter.get() == 1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
DAGSettings dagSettings = DAGSettings.builder()
.ignoreExist(false)
.dagMaxDepth(bizDConfs.getFlowDAGMaxDepth()).build();
olympicene.submit(executionId, dag, data, dagSettings, notifyInfo);
olympicene.submit(executionId, null, dag, data, dagSettings, notifyInfo);

Check warning on line 76 in rill-flow-service/src/main/java/com/weibo/rill/flow/service/dispatcher/FlowProtocolDispatcher.java

View check run for this annotation

Codecov / codecov/patch

rill-flow-service/src/main/java/com/weibo/rill/flow/service/dispatcher/FlowProtocolDispatcher.java#L76

Added line #L76 was not covered by tests
dagResourceStatistic.updateFlowTypeResourceStatus(parentDAGExecutionId, parentTaskName, resource.getResourceName(), dag);
ProfileActions.recordTinyDAGSubmit(executionId);
// 记录prometheus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,24 @@
String businessId = DescriptorIdUtil.changeDescriptorIdToBusinessId(descriptorId);
Map<String, Object> context = dagContextInitializer.newSubmitContextBuilder(businessId).withData(data).withIdentity(descriptorId).build();

return submit(uid, descriptorId, context, callback, resourceCheckConfig);
return submit(uid, descriptorId, null, context, callback, resourceCheckConfig);
};

return profileRecordService.runNotifyAndRecordProfile(url, descriptorId, submitActions);
}


public Map<String, Object> submit(User flowUser, String descriptorId, Map<String, Object> context, String callback, ResourceCheckConfig resourceCheckConfig) {
return submit(Optional.ofNullable(flowUser).map(User::getUid).orElse(0L), descriptorId, context, callback, resourceCheckConfig);
return submit(Optional.ofNullable(flowUser).map(User::getUid).orElse(0L), descriptorId, null, context, callback, resourceCheckConfig);
}

public Map<String, Object> submit(Long uid, String descriptorId, Map<String, Object> context, String callback, ResourceCheckConfig resourceCheckConfig) {
public Map<String, Object> submit(User flowUser, String descriptorId, Map<String, Object> context, String callback, ResourceCheckConfig resourceCheckConfig, String taskName) {
return submit(Optional.ofNullable(flowUser).map(User::getUid).orElse(0L), descriptorId, taskName, context, callback, resourceCheckConfig);

Check warning on line 119 in rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/OlympiceneFacade.java

View check run for this annotation

Codecov / codecov/patch

rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/OlympiceneFacade.java#L119

Added line #L119 was not covered by tests
}

public Map<String, Object> submit(Long uid, String descriptorId, String taskName, Map<String, Object> context, String callback, ResourceCheckConfig resourceCheckConfig) {
DAG dag = dagDescriptorService.getDAG(uid, context, descriptorId);

String executionId = ExecutionIdUtil.generateExecutionId(dag);

dagSubmitChecker.check(executionId, resourceCheckConfig);
Expand All @@ -128,7 +133,7 @@
.build();
}
context.put("flow_execution_id", executionId);
olympicene.submit(executionId, dag, context, DAGSettings.DEFAULT, notifyInfo);
olympicene.submit(executionId, taskName, dag, context, DAGSettings.DEFAULT, notifyInfo);
Map<String, Object> ret = Maps.newHashMap();
ret.put("execution_id", executionId);
return ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class OlympiceneFacadeTest extends Specification {
user.getUid() >> 1L
expect:
facade.submit(1L, "testBusiness:testFeatureName", new JSONObject(["resourceName": "testCallbackUrl"]).toJSONString(), null, new JSONObject(["a": 1]), null)
facade.submit(1L, "testBusiness:testFeatureName", ["resourceName": "testCallbackUrl"], null, null)
facade.submit(1L, "testBusiness:testFeatureName", null, ["resourceName": "testCallbackUrl"], null, null)
facade.submit(user, "testBusiness:testFeatureName", ["resourceName": "testCallbackUrl"], null, null)
}

Expand Down
Loading
Loading