Maven依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>ageiport-processor-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.8.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
定义几个导入导出用到的POJO分别对应QUERY/VIEW/DATA三个泛型
@ToString
@Getter
@Setter
public class Data {
private Integer id;
private String name;
}
@Getter
@Setter
public class Query {
private Integer totalCount = 10000;
private List<View> checkErrorData;
private List<View> writeErrorData;
}
@Getter
@Setter
public class View {
@ViewField(headerName = "编码")
private Integer id;
@ViewField(headerName = "姓名")
private String name;
}
工具类TaskHelper
import com.alibaba.ageiport.common.feature.FeatureUtils;
import com.alibaba.ageiport.common.utils.JsonUtil;
import com.alibaba.ageiport.ext.arch.ExtensionLoader;
import com.alibaba.ageiport.processor.core.AgeiPort;
import com.alibaba.ageiport.processor.core.constants.MainTaskFeatureKeys;
import com.alibaba.ageiport.processor.core.model.core.ColumnHeader;
import com.alibaba.ageiport.processor.core.model.core.impl.ColumnHeaderImpl;
import com.alibaba.ageiport.processor.core.model.core.impl.ColumnHeadersImpl;
import com.alibaba.ageiport.processor.core.model.core.impl.MainTask;
import com.alibaba.ageiport.processor.core.spi.file.DataGroup;
import com.alibaba.ageiport.processor.core.spi.file.FileReader;
import com.alibaba.ageiport.processor.core.spi.file.FileReaderFactory;
import com.alibaba.ageiport.processor.core.spi.service.TaskProgressParam;
import com.alibaba.ageiport.processor.core.spi.service.TaskProgressResult;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import java.io.File;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@Slf4j
public class TestHelper {
private AgeiPort ageiPort;
public TestHelper(AgeiPort ageiPort) {
this.ageiPort = ageiPort;
}
public String file(String fileName) {
return "." + File.separator + "files" + File.separator + "import-xlsx" + File.separator + fileName;
}
public void assertWithoutFile(String mainTaskId) throws InterruptedException {
//1.创建进度查询请求参数GetTaskProgressParam
GetTaskProgressParam progressRequest = new GetTaskProgressParam();
progressRequest.setMainTaskId(mainTaskId);
//2.调用本地方法,查询任务进度。
TaskProgressResult taskProgress = ageiPort.getTaskService().getTaskProgress(progressRequest);
int sleepTime = 0;
log.info("getTaskProgress, taskProgress:{}", taskProgress);
//3.轮询任务进度,直至任务完成或出错
while (taskProgress == null || !taskProgress.getIsFinished() && !taskProgress.getIsError()) {
Thread.sleep(1000);
if (sleepTime++ > 100) {
Assertions.assertTrue(taskProgress.getIsFinished() || taskProgress.getIsError());
}
taskProgress = ageiPort.getTaskService().getTaskProgress(progressRequest);
if (taskProgress != null) {
log.info("getTaskProgress, percent:{}, stageName:{}", taskProgress.getPercent(), taskProgress.getStageName());
} else {
log.info("no progress...");
}
}
Assertions.assertTrue(taskProgress.getIsFinished());
Assertions.assertEquals(1, taskProgress.getPercent());
}
public void assertWithFile(String mainTaskId, Integer outputCount) throws InterruptedException {
//1.创建进度查询请求参数GetTaskProgressParam
GetTaskProgressParam progressRequest = new GetTaskProgressParam();
progressRequest.setMainTaskId(mainTaskId);
//2.调用本地方法,查询任务进度。
TaskProgressResult taskProgress = ageiPort.getTaskService().getTaskProgress(progressRequest);
int sleepTime = 0;
log.info("getTaskProgress, taskProgress:{}", taskProgress);
//3.轮询任务进度,直至任务完成或出错
while (taskProgress == null || !taskProgress.getIsFinished() && !taskProgress.getIsError()) {
Thread.sleep(1000);
if (sleepTime++ > 100) {
Assertions.assertTrue(taskProgress.getIsFinished() || taskProgress.getIsError());
}
taskProgress = ageiPort.getTaskService().getTaskProgress(progressRequest);
if (taskProgress != null) {
log.info("getTaskProgress, percent:{}, stageName:{}", taskProgress.getPercent(), taskProgress.getStageName());
} else {
log.info("no progress...");
}
}
Assertions.assertTrue(taskProgress.getIsFinished());
Assertions.assertEquals(1, taskProgress.getPercent());
//4.任务完成后,查询任务实例信息,任务输出的文件信息
MainTask mainTask = ageiPort.getTaskServerClient().getMainTask(taskProgress.getMainTaskId());
String fileKey = FeatureUtils.getFeature(mainTask.getFeature(), MainTaskFeatureKeys.OUTPUT_FILE_KEY);
boolean exists = ageiPort.getFileStore().exists(fileKey, new HashMap<>());
Assertions.assertTrue(exists);
String runtimeParam = mainTask.getRuntimeParam();
String fileType = FeatureUtils.getFeature(runtimeParam, MainTaskFeatureKeys.RT_FILE_TYPE_KEY);
String headersString = FeatureUtils.getFeature(runtimeParam, MainTaskFeatureKeys.RT_COLUMN_HEADERS_KEY);
List<ColumnHeaderImpl> columnHeaderList = JsonUtil.toArrayObject(headersString, ColumnHeaderImpl.class);
List<ColumnHeader> columnHeaderList1 = new ArrayList<>(columnHeaderList);
ColumnHeadersImpl headers = new ColumnHeadersImpl(columnHeaderList1);
//5.下载文件到本地
InputStream inputStream = ageiPort.getFileStore().get(fileKey, new HashMap<>());
String outputFileReaderFactory = ageiPort.getOptions().getFileTypeReaderSpiMappings().get(fileType);
final FileReaderFactory factory = ExtensionLoader.getExtensionLoader(FileReaderFactory.class).getExtension(outputFileReaderFactory);
//6.断言判断产生的文件是否符合期望
FileReader fileReader = factory.create(ageiPort, mainTask, headers);
fileReader.read(inputStream);
DataGroup dataGroup = fileReader.finish();
int count = 0;
List<DataGroup.Data> data = dataGroup.getData();
for (DataGroup.Data datum : data) {
if (datum.getItems() != null) {
count += datum.getItems().size();
}
}
}
}
第1步,实现导出Processor
- 实现ExportProcessor接口,并使用上文定义POJO Query、Data、View作为接口的泛型参数
- 实现ExportProcessor接口的TotalCount方法,根据Query返回当前导出的总条数(比如根据Query构造Select Count SQL,并返回Count值)
- 实现ExportProcessor接口的queryData方法,根据Query返回当前分页的实际数据(比如根据Query构造Select ... limit n,m SQL,并返回数据行),如果TotalCount有10000,分片大小默认为1000,则queryData方法会被并行调用10次,入参BizExportPage会携带不同分页信息。
- 实现ExportProcessor接口的convert方法,根据页面展示规则,把queryData返回的Data列表转为View列表。View列表会最终输出给用户。
import com.alibaba.ageiport.common.utils.BeanUtils;
import com.alibaba.ageiport.processor.core.annotation.ExportSpecification;
import com.alibaba.ageiport.processor.core.exception.BizException;
import com.alibaba.ageiport.processor.core.model.api.BizExportPage;
import com.alibaba.ageiport.processor.core.model.api.BizUser;
import com.alibaba.ageiport.processor.core.task.exporter.ExportProcessor;
import com.alibaba.ageiport.test.processor.core.model.Data;
import com.alibaba.ageiport.test.processor.core.model.Query;
import com.alibaba.ageiport.test.processor.core.model.View;
import java.util.ArrayList;
import java.util.List;
//1.实现ExportProcessor接口
@ExportSpecification(code = "StandaloneExportProcessor", name = "StandaloneExportProcessor")
public class StandaloneExportProcessor implements ExportProcessor<Query, Data, View> {
//2.实现ExportProcessor接口的TotalCount方法
@Override
public Integer totalCount(BizUser bizUser, Query query) throws BizException {
return query.getTotalCount();
}
//3.实现ExportProcessor接口的queryData方法
@Override
public List<Data> queryData(BizUser user, Query query, BizExportPage bizExportPage) throws BizException {
List<Data> dataList = new ArrayList<>();
Integer totalCount = query.getTotalCount();
for (int i = 1; i <= totalCount; i++) {
final Data data = new Data();
data.setId(i);
data.setName("name" + i);
dataList.add(data);
}
return dataList;
}
//4.实现ExportProcessor接口的convert方法
@Override
public List<View> convert(BizUser user, Query query, List<Data> data) throws BizException {
List<View> dataList = new ArrayList<>();
for (Data datum : data) {
View view = BeanUtils.cloneProp(datum, View.class);
dataList.add(view);
}
return dataList;
}
}
第2步,注册第1步实现的导出Processor
- 项目目录的resources文件夹下创建META-INF文件夹
- META-INF文件夹下创建名为com.alibaba.ageiport.processor.core.Processor的文本文件
- 添加一行配置记录:StandaloneExportProcessor=com.alibaba.ageiport.test.processor.core.exporter.StandaloneExportProcessor,其中等号左侧为ExportSpecification中的code,等号右侧为导出实现类的全类名
第3步,运行单元测试,执行任务
此步骤实际生产环境中一般由API触发,用户在页面点击触发执行导出任务
- 初始化AgeiPort实例,在单测中为了尽可能少的依赖外部组件,我们使用AgeiPortOptions.Debug()测试配置来初始化AgeiPort实例。若为生产环境,AgeiPort实例应被维护到应用的上下文中,比如在Spring的Configuration中初始化AgeiPort并作为一个bean存储在Spring上下文中。
- 构造查询参数TaskExecuteParam,必须传入:TaskSpecificationCode,标识某一个任务);BizUserId标识当前任务的触发人,上文定义的Query对象需要以JSON的格式传入。
- 调用本地方法executeTask,开始执行任务,并获取任务实例ID。用户可根据自身技术线路封装远程调用接口。processor包中也包含了可远程执行任务的HTTP API,详情见:API参考文档。
- 使用内部封装的TaskHelp方法判断任务是否执行成功,TestHelp类主要是封装了一些判断任务执行状态的断言,在后文中给出代码
import com.alibaba.ageiport.common.utils.JsonUtil;
import com.alibaba.ageiport.processor.core.AgeiPort;
import com.alibaba.ageiport.processor.core.AgeiPortOptions;
import com.alibaba.ageiport.processor.core.spi.service.TaskExecuteParam;
import com.alibaba.ageiport.processor.core.spi.service.TaskExecuteResult;
import com.alibaba.ageiport.test.processor.core.TestHelper;
import com.alibaba.ageiport.test.processor.core.model.Query;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@Slf4j
public class StandaloneExportProcessorTest {
@SneakyThrows
@Test
public void test() {
//1.初始化AgeiPort实例
AgeiPortOptions options = new AgeiPortOptions();
AgeiPortOptions.Debug debug = new AgeiPortOptions.Debug();
options.setDebug(debug);
AgeiPort ageiPort = AgeiPort.ageiPort(options);
//2.构造查询参数TaskExecuteParam
Query query = new Query();
query.setTotalCount(100);
//3.调用本地方法executeTask,开始执行任务,并获取任务实例ID。
TaskExecuteParam request = new TaskExecuteParam();
request.setTaskSpecificationCode(StandaloneExportProcessor.class.getSimpleName());
request.setBizUserId("userId");
request.setBizQuery(JsonUtil.toJsonString(query));
TaskExecuteResult response = ageiPort.getTaskService().executeTask(request);
Assertions.assertTrue(response.getSuccess());
//4.使用内部封装的TaskHelp方法判断任务是否执行成功
TestHelper testHelper = new TestHelper(ageiPort);
testHelper.assertWithFile(response.getMainTaskId(), query.getTotalCount());
}
}
第4步,查询任务进度,下载导出文件,见assertWithFile方法
此步骤生产环境中一般由API触发,在用户触发任务后,页面自动轮询任务进度,最终将生成的文件返回给用户,详见API参考文档。
- 创建进度查询请求参数GetTaskProgressParam,其中MainTaskId为上文创建任务返回的任务ID
- 调用本地方法,查询任务进度。用户可根据自身技术线路封装远程调用接口。processor包中也包含了可远程执行任务的HTTP API,详情见:API参考文档。
- 轮询任务进度,直至任务完成或出错
- 任务完成后,查询任务实例信息,任务输出的文件信息
- 下载文件到本地
- 断言判断产生的文件是否符合期望
import com.alibaba.ageiport.common.feature.FeatureUtils;
import com.alibaba.ageiport.common.utils.JsonUtil;
import com.alibaba.ageiport.ext.arch.ExtensionLoader;
import com.alibaba.ageiport.processor.core.AgeiPort;
import com.alibaba.ageiport.processor.core.constants.MainTaskFeatureKeys;
import com.alibaba.ageiport.processor.core.model.core.ColumnHeader;
import com.alibaba.ageiport.processor.core.model.core.impl.ColumnHeaderImpl;
import com.alibaba.ageiport.processor.core.model.core.impl.ColumnHeadersImpl;
import com.alibaba.ageiport.processor.core.model.core.impl.MainTask;
import com.alibaba.ageiport.processor.core.spi.file.DataGroup;
import com.alibaba.ageiport.processor.core.spi.file.FileReader;
import com.alibaba.ageiport.processor.core.spi.file.FileReaderFactory;
import com.alibaba.ageiport.processor.core.spi.service.TaskProgressParam;
import com.alibaba.ageiport.processor.core.spi.service.TaskProgressResult;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import java.io.File;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@Slf4j
public class TestHelper {
private AgeiPort ageiPort;
public TestHelper(AgeiPort ageiPort) {
this.ageiPort = ageiPort;
}
public String file(String fileName) {
return "." + File.separator + "files" + File.separator + "import-xlsx" + File.separator + fileName;
}
public void assertWithoutFile(String mainTaskId) throws InterruptedException {
GetTaskProgressParam progressRequest = new GetTaskProgressParam();
progressRequest.setMainTaskId(mainTaskId);
TaskProgressResult taskProgress = ageiPort.getTaskService().getTaskProgress(progressRequest);
int sleepTime = 0;
log.info("getTaskProgress, taskProgress:{}", taskProgress);
while (taskProgress == null || !taskProgress.getIsFinished() && !taskProgress.getIsError()) {
Thread.sleep(1000);
if (sleepTime++ > 100) {
Assertions.assertTrue(taskProgress.getIsFinished() || taskProgress.getIsError());
}
taskProgress = ageiPort.getTaskService().getTaskProgress(progressRequest);
if (taskProgress != null) {
log.info("getTaskProgress, percent:{}, stageName:{}", taskProgress.getPercent(), taskProgress.getStageName());
} else {
log.info("no progress...");
}
}
Assertions.assertTrue(taskProgress.getIsFinished());
Assertions.assertEquals(1, taskProgress.getPercent());
}
public void assertWithFile(String mainTaskId, Integer outputCount) throws InterruptedException {
//1.创建进度查询请求参数GetTaskProgressParam
GetTaskProgressParam progressRequest = new GetTaskProgressParam();
progressRequest.setMainTaskId(mainTaskId);
//2.调用本地方法,查询任务进度。
TaskProgressResult taskProgress = ageiPort.getTaskService().getTaskProgress(progressRequest);
int sleepTime = 0;
log.info("getTaskProgress, taskProgress:{}", taskProgress);
//3.轮询任务进度,直至任务完成或出错
while (taskProgress == null || !taskProgress.getIsFinished() && !taskProgress.getIsError()) {
Thread.sleep(1000);
if (sleepTime++ > 100) {
Assertions.assertTrue(taskProgress.getIsFinished() || taskProgress.getIsError());
}
taskProgress = ageiPort.getTaskService().getTaskProgress(progressRequest);
if (taskProgress != null) {
log.info("getTaskProgress, percent:{}, stageName:{}", taskProgress.getPercent(), taskProgress.getStageName());
} else {
log.info("no progress...");
}
}
Assertions.assertTrue(taskProgress.getIsFinished());
Assertions.assertEquals(1, taskProgress.getPercent());
//4.任务完成后,查询任务实例信息,任务输出的文件信息
MainTask mainTask = ageiPort.getTaskServerClient().getMainTask(taskProgress.getMainTaskId());
String fileKey = FeatureUtils.getFeature(mainTask.getFeature(), MainTaskFeatureKeys.OUTPUT_FILE_KEY);
boolean exists = ageiPort.getFileStore().exists(fileKey, new HashMap<>());
Assertions.assertTrue(exists);
String runtimeParam = mainTask.getRuntimeParam();
String fileType = FeatureUtils.getFeature(runtimeParam, MainTaskFeatureKeys.RT_FILE_TYPE_KEY);
String headersString = FeatureUtils.getFeature(runtimeParam, MainTaskFeatureKeys.RT_COLUMN_HEADERS_KEY);
List<ColumnHeaderImpl> columnHeaderList = JsonUtil.toArrayObject(headersString, ColumnHeaderImpl.class);
List<ColumnHeader> columnHeaderList1 = new ArrayList<>(columnHeaderList);
ColumnHeadersImpl headers = new ColumnHeadersImpl(columnHeaderList1);
//5.下载文件到本地
InputStream inputStream = ageiPort.getFileStore().get(fileKey, new HashMap<>());
String outputFileReaderFactory = ageiPort.getOptions().getFileTypeReaderSpiMappings().get(fileType);
final FileReaderFactory factory = ExtensionLoader.getExtensionLoader(FileReaderFactory.class).getExtension(outputFileReaderFactory);
//6.断言判断产生的文件是否符合期望
FileReader fileReader = factory.create(ageiPort, mainTask, headers);
fileReader.read(inputStream);
DataGroup dataGroup = fileReader.finish();
int count = 0;
List<DataGroup.Data> data = dataGroup.getData();
for (DataGroup.Data datum : data) {
if (datum.getItems() != null) {
count += datum.getItems().size();
}
}
}
}
方式一:ExportSpecification中通过executeType属性指定,默认值为"STANDALONE"即为单机执行
import com.alibaba.ageiport.common.utils.BeanUtils;
import com.alibaba.ageiport.processor.core.annotation.ExportSpecification;
import com.alibaba.ageiport.processor.core.exception.BizException;
import com.alibaba.ageiport.processor.core.model.api.BizExportPage;
import com.alibaba.ageiport.processor.core.model.api.BizUser;
import com.alibaba.ageiport.processor.core.task.exporter.ExportProcessor;
import com.alibaba.ageiport.test.processor.core.model.Data;
import com.alibaba.ageiport.test.processor.core.model.Query;
import com.alibaba.ageiport.test.processor.core.model.View;
import java.util.ArrayList;
import java.util.List;
//1.实现ExportProcessor接口
@ExportSpecification(code = "StandaloneExportProcessor", name = "StandaloneExportProcessor")
public class StandaloneExportProcessor implements ExportProcessor<Query, Data, View> {
//2.实现ExportProcessor接口的TotalCount方法
@Override
public Integer totalCount(BizUser bizUser, Query query) throws BizException {
return query.getTotalCount();
}
//3.实现ExportProcessor接口的queryData方法
@Override
public List<Data> queryData(BizUser user, Query query, BizExportPage bizExportPage) throws BizException {
List<Data> dataList = new ArrayList<>();
Integer totalCount = query.getTotalCount();
for (int i = 1; i <= totalCount; i++) {
final Data data = new Data();
data.setId(i);
data.setName("name" + i);
dataList.add(data);
}
return dataList;
}
//4.实现ExportProcessor接口的convert方法
@Override
public List<View> convert(BizUser user, Query query, List<Data> data) throws BizException {
List<View> dataList = new ArrayList<>();
for (Data datum : data) {
View view = BeanUtils.cloneProp(datum, View.class);
dataList.add(view);
}
return dataList;
}
}
方式二:通过实现接口com.alibaba.ageiport.processor.core.task.exporter.ExportProcessor#taskRuntimeConfig返回ExecuteType的值为"STANDALONE",可动态设置执行方式,此方式优先级比方式一高
@ExportSpecification(code = "StandaloneExportProcessor", name = "StandaloneExportProcessor")
public class StandaloneExportProcessor implements ExportProcessor<Query, Data, View> {
public BizExportTaskRuntimeConfig taskRuntimeConfig(BizUser user, Query query) throws BizException {
final BizExportTaskRuntimeConfigImpl runtimeConfig = new BizExportTaskRuntimeConfigImpl();
runtimeConfig.setExecuteType("STANDALONE");
return runtimeConfig;
}
}
方式一:ExportSpecification中通过executeType属性指定,设置为"CLUSTER"即为多机执行
import com.alibaba.ageiport.common.utils.BeanUtils;
import com.alibaba.ageiport.processor.core.annotation.ExportSpecification;
import com.alibaba.ageiport.processor.core.constants.ExecuteType;
import com.alibaba.ageiport.processor.core.exception.BizException;
import com.alibaba.ageiport.processor.core.model.api.BizExportPage;
import com.alibaba.ageiport.processor.core.model.api.BizUser;
import com.alibaba.ageiport.processor.core.task.exporter.ExportProcessor;
import com.alibaba.ageiport.test.processor.core.model.Data;
import com.alibaba.ageiport.test.processor.core.model.Query;
import com.alibaba.ageiport.test.processor.core.model.View;
import java.util.ArrayList;
import java.util.List;
@ExportSpecification(code = "ClusterExportProcessor", name = "StandaloneExportProcessor", executeType = ExecuteType.CLUSTER)
public class ClusterExportProcessor implements ExportProcessor<Query, Data, View> {
@Override
public Integer totalCount(BizUser bizUser, Query query) throws BizException {
return query.getTotalCount();
}
@Override
public List<Data> queryData(BizUser user, Query query, BizExportPage bizExportPage) throws BizException {
List<Data> dataList = new ArrayList<>();
Integer totalCount = query.getTotalCount();
for (int i = 1; i <= totalCount; i++) {
final Data data = new Data();
data.setId(i);
data.setName("name" + i);
dataList.add(data);
}
return dataList;
}
@Override
public List<View> convert(BizUser user, Query query, List<Data> data) throws BizException {
List<View> dataList = new ArrayList<>();
for (Data datum : data) {
View view = BeanUtils.cloneProp(datum, View.class);
dataList.add(view);
}
return dataList;
}
}
方式二:通过实现接口com.alibaba.ageiport.processor.core.task.exporter.ExportProcessor#taskRuntimeConfig返回ExecuteType的值为"CLUSTER",可动态设置执行方式,此方式优先级比方式一高
@ExportSpecification(code = "ClusterExportProcessor", name = "StandaloneExportProcessor", executeType = ExecuteType.CLUSTER)
public class ClusterExportProcessor implements ExportProcessor<Query, Data, View> {
...
@Override
public BizExportTaskRuntimeConfig taskRuntimeConfig(BizUser user, Query query) throws BizException {
final BizExportTaskRuntimeConfigImpl runtimeConfig = new BizExportTaskRuntimeConfigImpl();
runtimeConfig.setExecuteType("CLUSTER");
return runtimeConfig;
}
}