Maven依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>ageiport-processor-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.8.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
定义几个导入导出用到的POJO分别对应QUERY/VIEW/DATA三个泛型
@ToString
@Getter
@Setter
public class Data {
private Integer id;
private String name;
}
@Getter
@Setter
public class Query {
private Integer totalCount = 10000;
private List<View> checkErrorData;
private List<View> writeErrorData;
}
@Getter
@Setter
public class View {
@ViewField(headerName = "编码")
private Integer id;
@ViewField(headerName = "姓名")
private String name;
}
工具类TaskHelper
import com.alibaba.ageiport.common.feature.FeatureUtils;
import com.alibaba.ageiport.common.utils.JsonUtil;
import com.alibaba.ageiport.ext.arch.ExtensionLoader;
import com.alibaba.ageiport.processor.core.AgeiPort;
import com.alibaba.ageiport.processor.core.constants.MainTaskFeatureKeys;
import com.alibaba.ageiport.processor.core.model.core.ColumnHeader;
import com.alibaba.ageiport.processor.core.model.core.impl.ColumnHeaderImpl;
import com.alibaba.ageiport.processor.core.model.core.impl.ColumnHeadersImpl;
import com.alibaba.ageiport.processor.core.model.core.impl.MainTask;
import com.alibaba.ageiport.processor.core.spi.file.DataGroup;
import com.alibaba.ageiport.processor.core.spi.file.FileReader;
import com.alibaba.ageiport.processor.core.spi.file.FileReaderFactory;
import com.alibaba.ageiport.processor.core.spi.service.TaskProgressParam;
import com.alibaba.ageiport.processor.core.spi.service.TaskProgressResult;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import java.io.File;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@Slf4j
public class TestHelper {
private AgeiPort ageiPort;
public TestHelper(AgeiPort ageiPort) {
this.ageiPort = ageiPort;
}
public String file(String fileName) {
return "." + File.separator + "files" + File.separator + "import-xlsx" + File.separator + fileName;
}
public void assertWithoutFile(String mainTaskId) throws InterruptedException {
//1.创建进度查询请求参数GetTaskProgressParam
GetTaskProgressParam progressRequest = new GetTaskProgressParam();
progressRequest.setMainTaskId(mainTaskId);
//2.调用本地方法,查询任务进度。
TaskProgressResult taskProgress = ageiPort.getTaskService().getTaskProgress(progressRequest);
int sleepTime = 0;
log.info("getTaskProgress, taskProgress:{}", taskProgress);
//3.轮询任务进度,直至任务完成或出错
while (taskProgress == null || !taskProgress.getIsFinished() && !taskProgress.getIsError()) {
Thread.sleep(1000);
if (sleepTime++ > 100) {
Assertions.assertTrue(taskProgress.getIsFinished() || taskProgress.getIsError());
}
taskProgress = ageiPort.getTaskService().getTaskProgress(progressRequest);
if (taskProgress != null) {
log.info("getTaskProgress, percent:{}, stageName:{}", taskProgress.getPercent(), taskProgress.getStageName());
} else {
log.info("no progress...");
}
}
Assertions.assertTrue(taskProgress.getIsFinished());
Assertions.assertEquals(1, taskProgress.getPercent());
}
public void assertWithFile(String mainTaskId, Integer outputCount) throws InterruptedException {
//1.创建进度查询请求参数GetTaskProgressParam
GetTaskProgressParam progressRequest = new GetTaskProgressParam();
progressRequest.setMainTaskId(mainTaskId);
//2.调用本地方法,查询任务进度。
TaskProgressResult taskProgress = ageiPort.getTaskService().getTaskProgress(progressRequest);
int sleepTime = 0;
log.info("getTaskProgress, taskProgress:{}", taskProgress);
//3.轮询任务进度,直至任务完成或出错
while (taskProgress == null || !taskProgress.getIsFinished() && !taskProgress.getIsError()) {
Thread.sleep(1000);
if (sleepTime++ > 100) {
Assertions.assertTrue(taskProgress.getIsFinished() || taskProgress.getIsError());
}
taskProgress = ageiPort.getTaskService().getTaskProgress(progressRequest);
if (taskProgress != null) {
log.info("getTaskProgress, percent:{}, stageName:{}", taskProgress.getPercent(), taskProgress.getStageName());
} else {
log.info("no progress...");
}
}
Assertions.assertTrue(taskProgress.getIsFinished());
Assertions.assertEquals(1, taskProgress.getPercent());
//4.任务完成后,查询任务实例信息,任务输出的文件信息
MainTask mainTask = ageiPort.getTaskServerClient().getMainTask(taskProgress.getMainTaskId());
String fileKey = FeatureUtils.getFeature(mainTask.getFeature(), MainTaskFeatureKeys.OUTPUT_FILE_KEY);
boolean exists = ageiPort.getFileStore().exists(fileKey, new HashMap<>());
Assertions.assertTrue(exists);
String runtimeParam = mainTask.getRuntimeParam();
String fileType = FeatureUtils.getFeature(runtimeParam, MainTaskFeatureKeys.RT_FILE_TYPE_KEY);
String headersString = FeatureUtils.getFeature(runtimeParam, MainTaskFeatureKeys.RT_COLUMN_HEADERS_KEY);
List<ColumnHeaderImpl> columnHeaderList = JsonUtil.toArrayObject(headersString, ColumnHeaderImpl.class);
List<ColumnHeader> columnHeaderList1 = new ArrayList<>(columnHeaderList);
ColumnHeadersImpl headers = new ColumnHeadersImpl(columnHeaderList1);
//5.下载文件到本地
InputStream inputStream = ageiPort.getFileStore().get(fileKey, new HashMap<>());
String outputFileReaderFactory = ageiPort.getOptions().getFileTypeReaderSpiMappings().get(fileType);
final FileReaderFactory factory = ExtensionLoader.getExtensionLoader(FileReaderFactory.class).getExtension(outputFileReaderFactory);
//6.断言判断产生的文件是否符合期望
FileReader fileReader = factory.create(ageiPort, mainTask, headers);
fileReader.read(inputStream);
DataGroup dataGroup = fileReader.finish();
int count = 0;
List<DataGroup.Data> data = dataGroup.getData();
for (DataGroup.Data datum : data) {
if (datum.getItems() != null) {
count += datum.getItems().size();
}
}
}
}
更多样例及文档说明:导入样例及文档说明
第0步,准备导入文件
此步骤在生产环境中不需要,一般导入的文件由用户通过Web页面上传
- 项目目录的resources文件夹下创建import-xlsx文件夹
- 在import-xlsx文件夹创建StandaloneImportProcessor.xlsx文件,包含两列编码、姓名(与View中的字段对应)
- 在StandaloneImportProcessor.xlsx文件添加一些数据
第1步,实现导入Processor
- 实现ImportProcessor接口,并使用上文定义POJO Query、Data、View作为接口的泛型参数
- 实现ImportProcessor接口的convertAndCheck方法,此方法负责检查导入的数据,文件中的View对象转为Data对象。返回值BizImportResult<View, Data>,当无错误数据时BizImportResult只设置View即可,若有错误数据则需设置View,View最终会被写到文件中输出给用户。
- 实现ExportProcessor接口的write方法,此方法负责执行写入业务逻辑。返回值BizImportResult<View, Data>,当无错误数据时BizImportResult只设置View即可,若有错误数据则需设置View,View最终会被写到文件中输出给用户。
import com.alibaba.ageiport.common.logger.Logger;
import com.alibaba.ageiport.common.logger.LoggerFactory;
import com.alibaba.ageiport.common.utils.JsonUtil;
import com.alibaba.ageiport.processor.core.annotation.ImportSpecification;
import com.alibaba.ageiport.processor.core.model.api.BizUser;
import com.alibaba.ageiport.processor.core.task.importer.ImportProcessor;
import com.alibaba.ageiport.processor.core.task.importer.model.BizImportResult;
import com.alibaba.ageiport.processor.core.task.importer.model.BizImportResultImpl;
import com.alibaba.ageiport.test.processor.core.model.Data;
import com.alibaba.ageiport.test.processor.core.model.Query;
import com.alibaba.ageiport.test.processor.core.model.View;
import java.util.ArrayList;
import java.util.List;
//1.实现ImportProcessor接口
@ImportSpecification(code = "StandaloneImportProcessor", name = "StandaloneImportProcessor")
public class StandaloneImportProcessor implements ImportProcessor<Query, Data, View> {
Logger logger = LoggerFactory.getLogger(StandaloneImportProcessor.class);
//2.实现ImportProcessor接口的convertAndCheck方法
@Override
public BizImportResult<View, Data> convertAndCheck(BizUser user, Query query, List<View> views) {
BizImportResultImpl<View, Data> result = new BizImportResultImpl<>();
List<Data> data = new ArrayList<>();
for (View view : views) {
Data datum = new Data();
datum.setId(view.getId());
datum.setName(view.getName());
data.add(datum);
}
result.setData(data);
result.setView(query.getCheckErrorData());
return result;
}
//3.实现ExportProcessor接口的write方法,此方法负责执行写入业务逻辑。
@Override
public BizImportResult<View, Data> write(BizUser user, Query query, List<Data> data) {
BizImportResultImpl<View, Data> result = new BizImportResultImpl<>();
logger.info(JsonUtil.toJsonString(data));
result.setView(query.getWriteErrorData());
return result;
}
}
第2步,注册第1步实现的导入Processor
- 项目目录的resources文件夹下创建META-INF文件夹
- META-INF文件夹下创建名为com.alibaba.ageiport.processor.core.Processor的文本文件
- 添加一行配置记录:StandaloneImportProcessor=com.alibaba.ageiport.test.processor.core.importer.StandaloneImportProcessor,其中等号左侧为ImportSpecification中的code,等号右侧为导出实现类的全类名
第3步,运行单元测试,执行任务
此步骤实际生产环境中一般由API触发,是用户从页面点击导入按钮上传文件触发的。
- 初始化AgeiPort实例,在单测中为了尽可能少的依赖外部组件,我们使用AgeiPortOptions.Debug()测试配置来初始化AgeiPort实例。若为生产环境,AgeiPort实例应被维护到应用的上下文中,比如在Spring的Configuration中初始化AgeiPort并作为一个bean存储在Spring上下文中。
- 读取文件并将文件上传到文件存储中,获取文件的Key。
- 构造查询参数TaskExecuteParam,必须传入:TaskSpecificationCode,标识某一个任务;BizUserId标识当前任务的触发人;InputFileKey为第2步中获取到的文件Key;上文定义的Query对象需要以JSON的格式传入。
- 调用本地方法executeTask,开始执行任务,并获取任务实例ID。用户可根据自身技术线路封装远程调用接口。processor包中也包含了可远程执行任务的HTTP API,详情见:API参考文档。
- 使用内部封装的TaskHelp方法判断任务是否执行成功,TestHelp类主要是封装了一些判断任务执行状态的断言,在后文中给出代码
import com.alibaba.ageiport.common.collections.Lists;
import com.alibaba.ageiport.common.utils.JsonUtil;
import com.alibaba.ageiport.processor.core.AgeiPort;
import com.alibaba.ageiport.processor.core.AgeiPortOptions;
import com.alibaba.ageiport.processor.core.spi.service.TaskExecuteParam;
import com.alibaba.ageiport.processor.core.spi.service.TaskExecuteResult;
import com.alibaba.ageiport.test.processor.core.TestHelper;
import com.alibaba.ageiport.test.processor.core.model.Query;
import com.alibaba.ageiport.test.processor.core.model.View;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.InputStream;
import java.util.HashMap;
import java.util.UUID;
@Slf4j
public class StandaloneImportProcessorTest {
//本例运行不会返回错误数据
@SneakyThrows
@Test
public void test() {
//1.初始化AgeiPort实例
AgeiPortOptions options = new AgeiPortOptions();
AgeiPortOptions.Debug debug = new AgeiPortOptions.Debug();
options.setDebug(debug);
AgeiPort ageiPort = AgeiPort.ageiPort(options);
//2.读取文件,并上传到文件存储中
String taskCode = StandaloneImportProcessor.class.getSimpleName();
TestHelper testHelper = new TestHelper(ageiPort);
String filePath = testHelper.file(taskCode + ".xlsx");
InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(filePath);
String fileKey = UUID.randomUUID().toString();
ageiPort.getFileStore().save(fileKey, inputStream, new HashMap<>());
//3.构造查询参数TaskExecuteParam
TaskExecuteParam request = new TaskExecuteParam();
Query query = new Query();
query.setTotalCount(100);
request.setTaskSpecificationCode(taskCode);
request.setBizUserId("userId");
request.setBizQuery(JsonUtil.toJsonString(query));
request.setInputFileKey(fileKey);
//4.调用本地方法executeTask,开始执行任务,并获取任务实例ID
TaskExecuteResult response = ageiPort.getTaskService().executeTask(request);
//5.使用内部封装的TaskHelp方法判断任务是否执行成功
Assertions.assertTrue(response.getSuccess());
testHelper.assertWithoutFile(response.getMainTaskId());
}
//本例运行会返回错误数据
@SneakyThrows
@Test
public void testHasCheckError() {
//1.初始化AgeiPort实例
AgeiPortOptions options = new AgeiPortOptions();
AgeiPortOptions.Debug debug = new AgeiPortOptions.Debug();
options.setDebug(debug);
AgeiPort ageiPort = AgeiPort.ageiPort(options);
//2.读取文件,并上传到文件存储中
String taskCode = StandaloneImportProcessor.class.getSimpleName();
TestHelper testHelper = new TestHelper(ageiPort);
String filePath = testHelper.file(taskCode + ".xlsx");
InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(filePath);
String fileKey = UUID.randomUUID().toString();
ageiPort.getFileStore().save(fileKey, inputStream, new HashMap<>());
//3.构造查询参数TaskExecuteParam
TaskExecuteParam request = new TaskExecuteParam();
Query query = new Query();
View view = new View();
view.setId(1);
view.setName("name1");
query.setCheckErrorData(Lists.newArrayList(view));
query.setTotalCount(100);
request.setTaskSpecificationCode(taskCode);
request.setBizUserId("userId");
request.setBizQuery(JsonUtil.toJsonString(query));
request.setInputFileKey(fileKey);
//4.调用本地方法executeTask,开始执行任务,并获取任务实例ID
TaskExecuteResult response = ageiPort.getTaskService().executeTask(request);
//5.使用内部封装的TaskHelp方法判断任务是否执行成功
Assertions.assertTrue(response.getSuccess());
testHelper.assertWithFile(response.getMainTaskId(), query.getErrorCount());
}
}
第4步,查询任务进度,下载导出文件,见assertWithFile方法
此步骤生产环境中一般由API触发,在用户触发任务后,页面自动轮询任务进度,最终将生成的文件返回给用户,详见API参考文档。
- 创建进度查询请求参数GetTaskProgressParam,其中MainTaskId为上文创建任务返回的任务ID
- 调用本地方法,查询任务进度。用户可根据自身技术线路封装远程调用接口。processor包中也包含了可远程执行任务的HTTP API,详情见:API参考文档。
- 轮询任务进度,直至任务完成或出错
- 若convertAndCheck和write方法返回的BizImportResult包含View,View数据会被写入到输出文件中
- 若导入有错误数据文件输出,下载文件到本地
- 若已下载错误文件,断言判断错误文件中的数据是否符合预期
import com.alibaba.ageiport.common.feature.FeatureUtils;
import com.alibaba.ageiport.common.utils.JsonUtil;
import com.alibaba.ageiport.ext.arch.ExtensionLoader;
import com.alibaba.ageiport.processor.core.AgeiPort;
import com.alibaba.ageiport.processor.core.constants.MainTaskFeatureKeys;
import com.alibaba.ageiport.processor.core.model.core.ColumnHeader;
import com.alibaba.ageiport.processor.core.model.core.impl.ColumnHeaderImpl;
import com.alibaba.ageiport.processor.core.model.core.impl.ColumnHeadersImpl;
import com.alibaba.ageiport.processor.core.model.core.impl.MainTask;
import com.alibaba.ageiport.processor.core.spi.file.DataGroup;
import com.alibaba.ageiport.processor.core.spi.file.FileReader;
import com.alibaba.ageiport.processor.core.spi.file.FileReaderFactory;
import com.alibaba.ageiport.processor.core.spi.service.TaskProgressParam;
import com.alibaba.ageiport.processor.core.spi.service.TaskProgressResult;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import java.io.File;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@Slf4j
public class TestHelper {
private AgeiPort ageiPort;
public TestHelper(AgeiPort ageiPort) {
this.ageiPort = ageiPort;
}
public String file(String fileName) {
return "." + File.separator + "files" + File.separator + "import-xlsx" + File.separator + fileName;
}
public void assertWithoutFile(String mainTaskId) throws InterruptedException {
//1.创建进度查询请求参数GetTaskProgressParam
GetTaskProgressParam progressRequest = new GetTaskProgressParam();
progressRequest.setMainTaskId(mainTaskId);
//2.调用本地方法,查询任务进度。
TaskProgressResult taskProgress = ageiPort.getTaskService().getTaskProgress(progressRequest);
int sleepTime = 0;
log.info("getTaskProgress, taskProgress:{}", taskProgress);
//3.轮询任务进度,直至任务完成或出错
while (taskProgress == null || !taskProgress.getIsFinished() && !taskProgress.getIsError()) {
Thread.sleep(1000);
if (sleepTime++ > 100) {
Assertions.assertTrue(taskProgress.getIsFinished() || taskProgress.getIsError());
}
taskProgress = ageiPort.getTaskService().getTaskProgress(progressRequest);
if (taskProgress != null) {
log.info("getTaskProgress, percent:{}, stageName:{}", taskProgress.getPercent(), taskProgress.getStageName());
} else {
log.info("no progress...");
}
}
Assertions.assertTrue(taskProgress.getIsFinished());
Assertions.assertEquals(1, taskProgress.getPercent());
}
public void assertWithFile(String mainTaskId, Integer outputCount) throws InterruptedException {
//1.创建进度查询请求参数GetTaskProgressParam
GetTaskProgressParam progressRequest = new GetTaskProgressParam();
progressRequest.setMainTaskId(mainTaskId);
//2.调用本地方法,查询任务进度。
TaskProgressResult taskProgress = ageiPort.getTaskService().getTaskProgress(progressRequest);
int sleepTime = 0;
log.info("getTaskProgress, taskProgress:{}", taskProgress);
//3.轮询任务进度,直至任务完成或出错
while (taskProgress == null || !taskProgress.getIsFinished() && !taskProgress.getIsError()) {
Thread.sleep(1000);
if (sleepTime++ > 100) {
Assertions.assertTrue(taskProgress.getIsFinished() || taskProgress.getIsError());
}
taskProgress = ageiPort.getTaskService().getTaskProgress(progressRequest);
if (taskProgress != null) {
log.info("getTaskProgress, percent:{}, stageName:{}", taskProgress.getPercent(), taskProgress.getStageName());
} else {
log.info("no progress...");
}
}
Assertions.assertTrue(taskProgress.getIsFinished());
Assertions.assertEquals(1, taskProgress.getPercent());
//4.任务完成后,查询任务实例信息,任务输出的文件信息
MainTask mainTask = ageiPort.getTaskServerClient().getMainTask(taskProgress.getMainTaskId());
String fileKey = FeatureUtils.getFeature(mainTask.getFeature(), MainTaskFeatureKeys.OUTPUT_FILE_KEY);
boolean exists = ageiPort.getFileStore().exists(fileKey, new HashMap<>());
Assertions.assertTrue(exists);
String runtimeParam = mainTask.getRuntimeParam();
String fileType = FeatureUtils.getFeature(runtimeParam, MainTaskFeatureKeys.RT_FILE_TYPE_KEY);
String headersString = FeatureUtils.getFeature(runtimeParam, MainTaskFeatureKeys.RT_COLUMN_HEADERS_KEY);
List<ColumnHeaderImpl> columnHeaderList = JsonUtil.toArrayObject(headersString, ColumnHeaderImpl.class);
List<ColumnHeader> columnHeaderList1 = new ArrayList<>(columnHeaderList);
ColumnHeadersImpl headers = new ColumnHeadersImpl(columnHeaderList1);
//5.下载文件到本地
InputStream inputStream = ageiPort.getFileStore().get(fileKey, new HashMap<>());
String outputFileReaderFactory = ageiPort.getOptions().getFileTypeReaderSpiMappings().get(fileType);
final FileReaderFactory factory = ExtensionLoader.getExtensionLoader(FileReaderFactory.class).getExtension(outputFileReaderFactory);
//6.断言判断产生的文件是否符合期望
FileReader fileReader = factory.create(ageiPort, mainTask, headers);
fileReader.read(inputStream);
DataGroup dataGroup = fileReader.finish();
int count = 0;
List<DataGroup.Data> data = dataGroup.getData();
for (DataGroup.Data datum : data) {
if (datum.getItems() != null) {
count += datum.getItems().size();
}
}
}
}
方式一:ImportSpecification中通过executeType属性指定,默认值为"STANDALONE"即为单机执行
import com.alibaba.ageiport.common.logger.Logger;
import com.alibaba.ageiport.common.logger.LoggerFactory;
import com.alibaba.ageiport.common.utils.JsonUtil;
import com.alibaba.ageiport.processor.core.annotation.ImportSpecification;
import com.alibaba.ageiport.processor.core.model.api.BizUser;
import com.alibaba.ageiport.processor.core.task.importer.ImportProcessor;
import com.alibaba.ageiport.processor.core.task.importer.model.BizImportResult;
import com.alibaba.ageiport.processor.core.task.importer.model.BizImportResultImpl;
import com.alibaba.ageiport.test.processor.core.model.Data;
import com.alibaba.ageiport.test.processor.core.model.Query;
import com.alibaba.ageiport.test.processor.core.model.View;
import java.util.ArrayList;
import java.util.List;
//1.实现ImportProcessor接口
@ImportSpecification(code = "StandaloneImportProcessor", name = "StandaloneImportProcessor")
public class StandaloneImportProcessor implements ImportProcessor<Query, Data, View> {
Logger logger = LoggerFactory.getLogger(StandaloneImportProcessor.class);
//2.实现ImportProcessor接口的convertAndCheck方法
@Override
public BizImportResult<View, Data> convertAndCheck(BizUser user, Query query, List<View> views) {
BizImportResultImpl<View, Data> result = new BizImportResultImpl<>();
List<Data> data = new ArrayList<>();
for (View view : views) {
Data datum = new Data();
datum.setId(view.getId());
datum.setName(view.getName());
data.add(datum);
}
result.setData(data);
result.setView(query.getCheckErrorData());
return result;
}
//3.实现ExportProcessor接口的write方法,此方法负责执行写入业务逻辑。
@Override
public BizImportResult<View, Data> write(BizUser user, Query query, List<Data> data) {
BizImportResultImpl<View, Data> result = new BizImportResultImpl<>();
logger.info(JsonUtil.toJsonString(data));
result.setView(query.getWriteErrorData());
return result;
}
}
方式二:通过实现接口com.alibaba.ageiport.processor.core.task.importer.ImportProcessor#taskRuntimeConfig返回ExecuteType的值为"STANDALONE",可动态设置执行方式,此方式优先级比方式一高
@ImportSpecification(code = "StandaloneImportProcessor", name = "StandaloneImportProcessor")
public class StandaloneImportProcessor implements ImportProcessor<Query, Data, View> {
...
@Override
public BizImportTaskRuntimeConfig taskRuntimeConfig(BizUser user, Query query) throws BizException {
BizImportTaskRuntimeConfigImpl runtimeConfig = new BizImportTaskRuntimeConfigImpl();
runtimeConfig.setExecuteType("STANDALONE");
return runtimeConfig;
}
}
方式一:ImportSpecification中通过executeType属性指定,设置为"CLUSTER"即为多机执行
import com.alibaba.ageiport.common.logger.Logger;
import com.alibaba.ageiport.common.logger.LoggerFactory;
import com.alibaba.ageiport.common.utils.JsonUtil;
import com.alibaba.ageiport.processor.core.annotation.ImportSpecification;
import com.alibaba.ageiport.processor.core.constants.ExecuteType;
import com.alibaba.ageiport.processor.core.exception.BizException;
import com.alibaba.ageiport.processor.core.model.api.BizUser;
import com.alibaba.ageiport.processor.core.task.importer.ImportProcessor;
import com.alibaba.ageiport.processor.core.task.importer.api.BizImportTaskRuntimeConfig;
import com.alibaba.ageiport.processor.core.task.importer.api.BizImportTaskRuntimeConfigImpl;
import com.alibaba.ageiport.processor.core.task.importer.model.BizImportResult;
import com.alibaba.ageiport.processor.core.task.importer.model.BizImportResultImpl;
import com.alibaba.ageiport.test.processor.core.model.Data;
import com.alibaba.ageiport.test.processor.core.model.Query;
import com.alibaba.ageiport.test.processor.core.model.View;
import java.util.ArrayList;
import java.util.List;
@ImportSpecification(code = "ClusterImportProcessor", name = "StandaloneImportProcessor", executeType = ExecuteType.CLUSTER)
public class ClusterImportProcessor implements ImportProcessor<Query, Data, View> {
Logger logger = LoggerFactory.getLogger(ClusterImportProcessor.class);
@Override
public BizImportResult<View, Data> convertAndCheck(BizUser user, Query query, List<View> views) {
BizImportResultImpl<View, Data> result = new BizImportResultImpl<>();
List<Data> data = new ArrayList<>();
for (View view : views) {
Data datum = new Data();
datum.setId(view.getId());
datum.setName(view.getName());
data.add(datum);
}
result.setData(data);
result.setView(query.getCheckErrorData());
return result;
}
@Override
public BizImportResult<View, Data> write(BizUser user, Query query, List<Data> data) {
BizImportResultImpl<View, Data> result = new BizImportResultImpl<>();
logger.info(JsonUtil.toJsonString(data));
result.setView(query.getWriteErrorData());
return result;
}
}
方式二:通过实现接口com.alibaba.ageiport.processor.core.task.importer.ImportProcessor#taskRuntimeConfig返回ExecuteType的值为"CLUSTER",可动态设置执行方式,此方式优先级比方式一高
@ImportSpecification(code = "ClusterImportProcessor", name = "StandaloneImportProcessor", executeType = ExecuteType.CLUSTER)
public class ClusterImportProcessor implements ImportProcessor<Query, Data, View> {
@Override
public BizImportTaskRuntimeConfig taskRuntimeConfig(BizUser user, Query query) throws BizException {
BizImportTaskRuntimeConfigImpl runtimeConfig = new BizImportTaskRuntimeConfigImpl();
runtimeConfig.setExecuteType("CLUSTER");
return runtimeConfig;
}
}