Skip to content

Commit

Permalink
resolve sonar: class is part of one cycle containing 2 classes
Browse files Browse the repository at this point in the history
  • Loading branch information
zeyu10 committed Oct 14, 2024
1 parent 19c31bb commit b7b1cbe
Show file tree
Hide file tree
Showing 10 changed files with 416 additions and 394 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import com.weibo.rill.flow.olympicene.ddl.validation.dag.impl.FlowDAGValidator;
import com.weibo.rill.flow.olympicene.ddl.validation.dag.impl.ResourceDAGValidator;
import com.weibo.rill.flow.olympicene.spring.boot.exception.OlympicenceStarterException;
import com.weibo.rill.flow.olympicene.traversal.DAGOperations;
import com.weibo.rill.flow.olympicene.traversal.DAGOperationsImpl;
import com.weibo.rill.flow.olympicene.traversal.DAGTraversal;
import com.weibo.rill.flow.olympicene.traversal.Olympicene;
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo;
Expand Down Expand Up @@ -88,7 +88,7 @@ public Stasher stasher() {

@Bean
@ConditionalOnMissingBean(name = "popper")
public Popper popper(@Autowired DAGOperations dagOperations) {
public Popper popper(@Autowired DAGOperationsImpl dagOperations) {
return new DefaultPopper(dagOperations);
}

Expand Down Expand Up @@ -292,7 +292,7 @@ public TimeCheckRunner timeCheckRunner(

@Bean
@ConditionalOnMissingBean(name = "dagOperations")
public DAGOperations dagOperations(
public DAGOperationsImpl dagOperations(
@Autowired @Qualifier("taskRunners") Map<String, TaskRunner> taskRunners,
@Autowired @Qualifier("dagRunner") DAGRunner dagRunner,
@Autowired @Qualifier("dagTraversal") DAGTraversal dagTraversal,
Expand All @@ -301,7 +301,7 @@ public DAGOperations dagOperations(
@Autowired @Qualifier("runnerExecutor") ExecutorService runnerExecutor,
@Autowired(required = false) @Qualifier("dagResultHandler") DAGResultHandler dagResultHandler) {
log.info("begin to init default DAGOperations bean");
DAGOperations dagOperations = new DAGOperations(runnerExecutor, taskRunners, dagRunner,
DAGOperationsImpl dagOperations = new DAGOperationsImpl(runnerExecutor, taskRunners, dagRunner,
timeCheckRunner, dagTraversal, dagCallback, dagResultHandler);
dagTraversal.setDagOperations(dagOperations);
timeCheckRunner.setDagOperations(dagOperations);
Expand All @@ -312,7 +312,7 @@ public DAGOperations dagOperations(
@ConditionalOnMissingBean(name = "olympicene")
public Olympicene olympicene(
@Autowired @Qualifier("dagInfoStorage") DAGInfoStorage dagInfoStorage,
@Autowired @Qualifier("dagOperations") DAGOperations dagOperations,
@Autowired @Qualifier("dagOperations") DAGOperationsImpl dagOperations,
@Autowired @Qualifier("notifyExecutor") ExecutorService notifyExecutor,
@Autowired(required = false) @Qualifier("dagResultHandler") DAGResultHandler dagResultHandler) {
log.info("begin to init default Olympicene bean");
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void submitTraversal(String executionId, String completedTaskName) {
Runnable basicActions = () -> dagStorageProcedure.lockAndRun(
LockerKey.buildDagInfoLockName(executionId), () -> doTraversal(executionId, completedTaskName));
Runnable runnable = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TRAVERSAL_CUSTOMIZED_PLUGINS);
DAGOperations.OPERATE_WITH_RETRY.accept(runnable, SystemConfig.getTraversalRetryTimes());
DAGOperationsImpl.OPERATE_WITH_RETRY.accept(runnable, SystemConfig.getTraversalRetryTimes());
} catch (Exception e) {
log.error("executionId:{} traversal exception with completedTaskName:{}. ", executionId, completedTaskName, e);
}
Expand All @@ -99,7 +99,7 @@ public void submitTasks(String executionId, Set<TaskInfo> taskInfos, Map<String,
runTasks(executionId, taskToContexts);
}
});
DAGOperations.OPERATE_WITH_RETRY.accept(runnable, SystemConfig.getTraversalRetryTimes());
DAGOperationsImpl.OPERATE_WITH_RETRY.accept(runnable, SystemConfig.getTraversalRetryTimes());
} catch (Exception e) {
log.error("dag {} traversal exception with tasks {}. ", executionId, Joiner.on(",").join(taskInfos.stream().map(TaskInfo::getName).collect(Collectors.toList())), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@
@Slf4j
public class Olympicene implements DAGInteraction {
private final DAGInfoStorage dagInfoStorage;
private final DAGOperations dagOperations;
private final DAGOperationsImpl dagOperations;
private final ExecutorService notifyExecutor;
private final DAGResultHandler dagResultHandler;
@Setter
private long dagResultGetTimeoutInMillisecond = 5000;

public Olympicene(DAGInfoStorage dagInfoStorage, DAGOperations dagOperations, ExecutorService notifyExecutor, DAGResultHandler dagResultHandler) {
public Olympicene(DAGInfoStorage dagInfoStorage, DAGOperationsImpl dagOperations, ExecutorService notifyExecutor, DAGResultHandler dagResultHandler) {
this.dagInfoStorage = dagInfoStorage;
this.dagOperations = dagOperations;
this.notifyExecutor = notifyExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.weibo.rill.flow.olympicene.core.runtime.DAGInfoStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure;
import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager;
import com.weibo.rill.flow.olympicene.traversal.DAGOperations;
import com.weibo.rill.flow.olympicene.traversal.DAGOperationsImpl;
import com.weibo.rill.flow.olympicene.traversal.DAGTraversal;
import com.weibo.rill.flow.olympicene.traversal.Olympicene;
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo;
Expand Down Expand Up @@ -65,7 +65,7 @@ public static Olympicene build(DAGInfoStorage dagInfoStorage, DAGContextStorage
jsonPathInputOutputMapping, jsonPathInputOutputMapping, dagStorageProcedure, stasher, switcherManager);

DAGTraversal dagTraversal = new DAGTraversal(dagContextStorage, dagInfoStorage, dagStorageProcedure, executor);
DAGOperations dagOperations = new DAGOperations(executor, taskRunners, dagRunner, timeCheckRunner, dagTraversal, callback, dagResultHandler);
DAGOperationsImpl dagOperations = new DAGOperationsImpl(executor, taskRunners, dagRunner, timeCheckRunner, dagTraversal, callback, dagResultHandler);
dagTraversal.setDagOperations(dagOperations);
dagTraversal.setStasher(stasher);
timeCheckRunner.setDagOperations(dagOperations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
package com.weibo.rill.flow.olympicene.traversal.helper;

import com.weibo.rill.flow.interfaces.model.task.TaskInfo;
import com.weibo.rill.flow.olympicene.traversal.DAGOperations;
import com.weibo.rill.flow.olympicene.traversal.DAGOperationsImpl;
import org.apache.commons.lang3.tuple.Pair;

import java.util.Collection;
import java.util.Map;

public class DefaultPopper implements Popper {

private DAGOperations dagOperations;
private DAGOperationsImpl dagOperations;

public DefaultPopper(DAGOperations dagOperations) {
public DefaultPopper(DAGOperationsImpl dagOperations) {

Check warning on line 30 in rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/helper/DefaultPopper.java

View check run for this annotation

Codecov / codecov/patch

rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/helper/DefaultPopper.java#L30

Added line #L30 was not covered by tests
this.dagOperations = dagOperations;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import com.weibo.rill.flow.olympicene.core.runtime.DAGContextStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGInfoStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure;
import com.weibo.rill.flow.olympicene.traversal.DAGOperations;
import com.weibo.rill.flow.olympicene.traversal.DAGOperationsImpl;
import com.weibo.rill.flow.olympicene.traversal.checker.TimeCheckMember;
import com.weibo.rill.flow.olympicene.traversal.checker.TimeChecker;
import com.weibo.rill.flow.olympicene.traversal.helper.ContextHelper;
Expand All @@ -54,7 +54,7 @@ public class TimeCheckRunner {
private final DAGInfoStorage dagInfoStorage;
private final DAGContextStorage dagContextStorage;
@Setter
private DAGOperations dagOperations;
private DAGOperationsImpl dagOperations;

public TimeCheckRunner(TimeChecker timeChecker, DAGInfoStorage dagInfoStorage, DAGContextStorage dagContextStorage,
DAGStorageProcedure dagStorageProcedure) {
Expand Down Expand Up @@ -94,7 +94,7 @@ public void handleTimeCheck(String timeCheckMember) {
Map<String, Object> context = ContextHelper.getInstance().getContext(dagContextStorage, executionId, taskInfo);
dagOperations.runTasks(executionId, Lists.newArrayList(Pair.of(taskInfo, context)));
};
DAGOperations.OPERATE_WITH_RETRY.accept(operations, SystemConfig.getTimerRetryTimes());
DAGOperationsImpl.OPERATE_WITH_RETRY.accept(operations, SystemConfig.getTimerRetryTimes());

Check warning on line 97 in rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/TimeCheckRunner.java

View check run for this annotation

Codecov / codecov/patch

rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/TimeCheckRunner.java#L97

Added line #L97 was not covered by tests
break;
default:
log.warn("handleTimeCheck time check type nonsupport, type:{}", type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class CustomizedPluginTest extends Specification {

def "notify plugin test"() {
given:
DAGOperations dagOperationsMock = Mock(DAGOperations.class, 'constructorArgs': [null, null, null, null, null, null, null]) as DAGOperations
DAGOperationsImpl dagOperationsMock = Mock(DAGOperationsImpl.class, 'constructorArgs': [null, null, null, null, null, null, null]) as DAGOperationsImpl
Olympicene olympicene = new Olympicene(dagStorage, dagOperationsMock, SameThreadExecutorService.INSTANCE, null)

BiConsumer<Runnable, Map<String, Object>> plugin =
Expand Down Expand Up @@ -93,7 +93,7 @@ class CustomizedPluginTest extends Specification {
Map<String, TaskRunner> taskRunners = [(TaskCategory.FUNCTION.getValue()): functionTaskRunnerMock]
TimeCheckRunner timeCheckRunner = Mock(TimeCheckRunner.class, 'constructorArgs':[null, null, null, null]) as TimeCheckRunner
DAGTraversal dagTraversal = Mock(DAGTraversal.class, 'constructorArgs': [null, null, null, null]) as DAGTraversal
DAGOperations dagOperations = new DAGOperations(SameThreadExecutorService.INSTANCE, taskRunners, dagRunnerMock, timeCheckRunner, dagTraversal, Mock(Callback.class), null)
DAGOperationsImpl dagOperations = new DAGOperationsImpl(SameThreadExecutorService.INSTANCE, taskRunners, dagRunnerMock, timeCheckRunner, dagTraversal, Mock(Callback.class), null)

BiFunction<Supplier<ExecutionResult>, Map<String, Object>, ExecutionResult> plugin =
({ nextActions, params ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ class TimeCheckerTest extends Specification {
dag.getTasks().get(0).getTimeline().setTimeoutInSeconds(taskTimeout)

TimeCheckRunner timeCheckRunner = Mock(TimeCheckRunner.class, 'constructorArgs': [null, null, null, null]) as TimeCheckRunner
DAGOperations dagOperations = new DAGOperations(olympicene.dagOperations.runnerExecutor, olympicene.dagOperations.taskRunners, olympicene.dagOperations.dagRunner,
DAGOperationsImpl dagOperations = new DAGOperationsImpl(olympicene.dagOperations.runnerExecutor, olympicene.dagOperations.taskRunners, olympicene.dagOperations.dagRunner,
timeCheckRunner, olympicene.dagOperations.dagTraversal, olympicene.dagOperations.callback, olympicene.dagOperations.dagResultHandler)
dagOperations.dagTraversal.setDagOperations(dagOperations)
Olympicene olympiceneTimeMock = new Olympicene(olympicene.dagInfoStorage, dagOperations, olympicene.notifyExecutor, olympicene.dagResultHandler)
Expand Down

0 comments on commit b7b1cbe

Please sign in to comment.