Skip to content

Commit

Permalink
debug run one task when submit
Browse files Browse the repository at this point in the history
  • Loading branch information
zeyu10 committed Oct 14, 2024
1 parent 0353b75 commit 3f9d73f
Show file tree
Hide file tree
Showing 10 changed files with 35 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public interface DAGInteraction {
/**
* 提交需执行的DAG任务
*/
void submit(String executionId, DAG dag, Map<String, Object> data, DAGSettings settings, NotifyInfo notifyInfo);
void submit(String executionId, String taskName, DAG dag, Map<String, Object> data, DAGSettings settings, NotifyInfo notifyInfo);

/**
* 完成task后调用接口
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,9 @@ public void redoTask(String executionId, List<String> taskNames, Map<String, Obj
dagTraversal.submitTraversal(executionId, null);
}

public void submitDAG(String executionId, DAG dag, DAGSettings settings, Map<String, Object> data, NotifyInfo notifyInfo) {
public void submitDAG(String executionId, String taskName, DAG dag, DAGSettings settings, Map<String, Object> data, NotifyInfo notifyInfo) {
log.info("submitDAG task begin to execute executionId:{} notifyInfo:{}", executionId, notifyInfo);
ExecutionResult executionResult = dagRunner.submitDAG(executionId, dag, settings, data, notifyInfo);
ExecutionResult executionResult = dagRunner.submitDAG(executionId, taskName, dag, settings, data, notifyInfo);
Optional.ofNullable(getTimeoutSeconds(new HashMap<>(), executionResult.getContext(), dag.getTimeline()))
.ifPresent(timeoutSeconds -> timeCheckRunner.addDAGToTimeoutCheck(executionId, timeoutSeconds));
dagTraversal.submitTraversal(executionId, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,16 @@ public Olympicene(DAGInfoStorage dagInfoStorage, DAGOperations dagOperations, Ex
* 提交需执行的DAG任务
*/
public void submit(String executionId, DAG dag, Map<String, Object> data) {
submit(executionId, dag, data, DAGSettings.DEFAULT, null);
submit(executionId, null, dag, data, DAGSettings.DEFAULT, null);
}

/**
* 提交需执行的DAG任务
*/
@Override
public void submit(String executionId, DAG dag, Map<String, Object> data, DAGSettings settings, NotifyInfo notifyInfo) {
public void submit(String executionId, String taskName, DAG dag, Map<String, Object> data, DAGSettings settings, NotifyInfo notifyInfo) {
runNotify(executionId, NotifyType.SUBMIT, notifyInfo,
() -> dagOperations.submitDAG(executionId, dag, settings, data, notifyInfo));
() -> dagOperations.submitDAG(executionId, taskName, dag, settings, data, notifyInfo));
}

public void runNotify(String executionId, NotifyType notifyType, NotifyInfo notifyInfo, Runnable actions) {
Expand Down Expand Up @@ -186,7 +186,7 @@ public DAGResult run(String executionId, DAG dag, Map<String, Object> data, DAGS

dagResultHandler.initEnv(executionId);
doRunNotify(executionId, NotifyType.RUN, notifyInfo,
() -> dagOperations.submitDAG(executionId, dag, settings, data, notifyInfo));
() -> dagOperations.submitDAG(executionId, null, dag, settings, data, notifyInfo));
return dagResultHandler.getDAGResult(executionId, timeoutInMillisecond);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public DAGRunner(DAGContextStorage dagContextStorage, DAGInfoStorage dagInfoStor
this.dagStorageProcedure = dagStorageProcedure;
}

public ExecutionResult submitDAG(String executionId, DAG dag, DAGSettings settings, Map<String, Object> data, NotifyInfo notifyInfo) {
ExecutionResult ret = ExecutionResult.builder().build();
public ExecutionResult submitDAG(String executionId, String taskName, DAG dag, DAGSettings settings, Map<String, Object> data, NotifyInfo notifyInfo) {
ExecutionResult ret = ExecutionResult.builder().needRetry(false).retryIntervalInSeconds(0).build();

dagStorageProcedure.lockAndRun(LockerKey.buildDagInfoLockName(executionId), () -> {
DAGInfo currentExecutionIdDagInfo = dagInfoStorage.getBasicDAGInfo(executionId);
Expand Down Expand Up @@ -104,6 +104,15 @@ public ExecutionResult submitDAG(String executionId, DAG dag, DAGSettings settin
.dagInvokeMsg(dagInvokeMsg)
.dagStatus(DAGStatus.RUNNING)
.make();

if (MapUtils.isNotEmpty(dagInfoToUpdate.getTasks()) && taskName != null) {
for (Map.Entry<String, TaskInfo> taskInfoEntry : dagInfoToUpdate.getTasks().entrySet()) {
if (!taskInfoEntry.getKey().equals(taskName)) {
taskInfoEntry.getValue().setTaskStatus(TaskStatus.SKIPPED);

Check warning on line 111 in rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/DAGRunner.java

View check run for this annotation

Codecov / codecov/patch

rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/DAGRunner.java#L111

Added line #L111 was not covered by tests
}
}

Check warning on line 113 in rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/DAGRunner.java

View check run for this annotation

Codecov / codecov/patch

rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/DAGRunner.java#L113

Added line #L113 was not covered by tests
}

ret.setDagInfo(dagInfoToUpdate);
Optional.ofNullable(dagInvokeMsg)
.map(DAGInvokeMsg::getExecutionRoutes)
Expand Down Expand Up @@ -259,7 +268,7 @@ public ExecutionResult finishDAG(String executionId, DAGInfo dagInfo, DAGStatus
}

log.info("finishDAG finish, executionId:{}", executionId);
return ExecutionResult.builder().dagInfo(wholeDagInfo).context(context).build();
return ExecutionResult.builder().dagInfo(wholeDagInfo).context(context).needRetry(false).retryIntervalInSeconds(0).build();
}

private void updateDAGInvokeStartTime(DAGInfo dagInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class InvokeMsgTest extends Specification {
.parentDAGTaskInfoName("A")
.parentDAGTaskExecutionType(FunctionPattern.FLOW_SYNC)
.build()
olympicene.submit("smallFlow", smallFlow, [:], DAGSettings.DEFAULT, smallFlowSubmit)
olympicene.submit("smallFlow", null, smallFlow, [:], DAGSettings.DEFAULT, smallFlowSubmit)

NotifyInfo smallFlowNotify = NotifyInfo.builder()
.taskInfoName("B_0-C")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class MultiDAGTest extends Specification {
NotifyInfo notifyInfo = NotifyInfo.builder()
.parentDAGExecutionId("level1")
.parentDAGTaskInfoName("A").build()
olympicene.submit("level2", dag, [:], DAGSettings.DEFAULT, notifyInfo)
olympicene.submit("level2", null, dag, [:], DAGSettings.DEFAULT, notifyInfo)
TaskInfo level1TaskA = dagStorage.getDAGInfo("level1").getTask("A")
DAGInfo level2 = dagStorage.getDAGInfo("level2")

Expand All @@ -87,12 +87,12 @@ class MultiDAGTest extends Specification {
NotifyInfo notifyInfoLevel2 = NotifyInfo.builder()
.parentDAGExecutionId("level1")
.parentDAGTaskInfoName("A").build()
olympicene.submit("level2", dag, [:], dagSettings, notifyInfoLevel2)
olympicene.submit("level2", null, dag, [:], dagSettings, notifyInfoLevel2)

NotifyInfo notifyInfoLevel3 = NotifyInfo.builder()
.parentDAGExecutionId("level2")
.parentDAGTaskInfoName("A").build()
olympicene.submit("level3", dag, [:], dagSettings, notifyInfoLevel3)
olympicene.submit("level3", null, dag, [:], dagSettings, notifyInfoLevel3)

then:
def e = thrown(DAGTraversalException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public String handle(Resource resource, DispatchInfo dispatchInfo) {
DAGSettings dagSettings = DAGSettings.builder()
.ignoreExist(false)
.dagMaxDepth(bizDConfs.getFlowDAGMaxDepth()).build();
olympicene.submit(executionId, dag, data, dagSettings, notifyInfo);
olympicene.submit(executionId, null, dag, data, dagSettings, notifyInfo);

Check warning on line 80 in rill-flow-service/src/main/java/com/weibo/rill/flow/service/dispatcher/FlowProtocolDispatcher.java

View check run for this annotation

Codecov / codecov/patch

rill-flow-service/src/main/java/com/weibo/rill/flow/service/dispatcher/FlowProtocolDispatcher.java#L80

Added line #L80 was not covered by tests
dagResourceStatistic.updateFlowTypeResourceStatus(parentDAGExecutionId, parentTaskName, resource.getResourceName(), dag);
ProfileActions.recordTinyDAGSubmit(executionId);
// 记录prometheus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,23 @@ public Map<String, Object> submit(Long uid, String descriptorId, String callback
String businessId = DescriptorIdUtil.changeDescriptorIdToBusinessId(descriptorId);
Map<String, Object> context = dagContextInitializer.newSubmitContextBuilder(businessId).withData(data).withIdentity(descriptorId).build();

return submit(uid, descriptorId, context, callback, resourceCheckConfig);
return submit(uid, descriptorId, null, context, callback, resourceCheckConfig);
};

return profileRecordService.runNotifyAndRecordProfile(url, descriptorId, submitActions);
}


public Map<String, Object> submit(User flowUser, String descriptorId, Map<String, Object> context, String callback, ResourceCheckConfig resourceCheckConfig) {
return submit(Optional.ofNullable(flowUser).map(User::getUid).orElse(0L), descriptorId, context, callback, resourceCheckConfig);
return submit(Optional.ofNullable(flowUser).map(User::getUid).orElse(0L), descriptorId, null, context, callback, resourceCheckConfig);
}

public Map<String, Object> submit(Long uid, String descriptorId, Map<String, Object> context, String callback, ResourceCheckConfig resourceCheckConfig) {

public Map<String, Object> submit(User flowUser, String descriptorId, Map<String, Object> context, String callback, ResourceCheckConfig resourceCheckConfig, String taskName) {
return submit(Optional.ofNullable(flowUser).map(User::getUid).orElse(0L), descriptorId, taskName, context, callback, resourceCheckConfig);

Check warning on line 123 in rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/OlympiceneFacade.java

View check run for this annotation

Codecov / codecov/patch

rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/OlympiceneFacade.java#L123

Added line #L123 was not covered by tests
}

public Map<String, Object> submit(Long uid, String descriptorId, String taskName, Map<String, Object> context, String callback, ResourceCheckConfig resourceCheckConfig) {
String dagDescriptor = descriptorManager.getDagDescriptor(uid, context, descriptorId);
DAG dag = dagStringParser.parse(dagDescriptor);
String executionId = ExecutionIdUtil.generateExecutionId(dag);
Expand All @@ -132,7 +137,7 @@ public Map<String, Object> submit(Long uid, String descriptorId, Map<String, Obj
.build();
}
context.put("flow_execution_id", executionId);
olympicene.submit(executionId, dag, context, DAGSettings.DEFAULT, notifyInfo);
olympicene.submit(executionId, taskName, dag, context, DAGSettings.DEFAULT, notifyInfo);
Map<String, Object> ret = Maps.newHashMap();
ret.put("execution_id", executionId);
return ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class OlympiceneFacadeTest extends Specification {
user.getUid() >> 1L
expect:
facade.submit(1L, "testBusiness:testFeatureName", new JSONObject(["resourceName": "testCallbackUrl"]).toJSONString(), null, new JSONObject(["a": 1]), null)
facade.submit(1L, "testBusiness:testFeatureName", ["resourceName": "testCallbackUrl"], null, null)
facade.submit(1L, "testBusiness:testFeatureName", null, ["resourceName": "testCallbackUrl"], null, null)
facade.submit(user, "testBusiness:testFeatureName", ["resourceName": "testCallbackUrl"], null, null)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class FlowController {
@RequestMapping(value = "submit.json", method = RequestMethod.POST)
public Map<String, Object> submit(User flowUser,
@ApiParam(value = "工作流ID") @RequestParam(value = "descriptor_id") String descriptorId,
@ApiParam(value = "单步执行任务名称") @RequestParam(value = "task_name", required = false) String taskName,
@ApiParam(value = "执行完成后的回调地址") @RequestParam(value = "callback", required = false) String callback,
@ApiParam(value = "用于检测资源是否可用的检测规则") @RequestParam(value = "resource_check", required = false) String resourceCheck,
@ApiParam(value = "工作流执行的context信息") @RequestBody(required = false) JSONObject data) {
Expand Down

0 comments on commit 3f9d73f

Please sign in to comment.