基于注解的异步导入导出系统

December 17, 2023
测试
测试
测试
测试
13 分钟阅读

相信大家做Excel导入导出功能,都会遇到大数据量超时问题。一般解决方法,采用异步操作,但每次都需要自己写异步的代码,为了减少重复不必要的工作,我决定开发一套基于注解的导入导出功能,并且支持异步操作。

基于EasyExcel封装,我们先来看使用。

使用中

Excel导出

@EasyExcelExport(asyncSize = 2, excelPoolBeanName = "taskAsyncExecutor")
public List<OverallMaterialExportVo> exportExcel(HttpServletResponse response, String checkCode) {
    Page<OverallMaterialVo> voPage = pageByCheckCode(checkCode, null);
    List<OverallMaterialVo> voList = voPage.getData();
    return BeanUtils.copy(voList, OverallMaterialExportVo.class);
}

@EasyExcelExport注解来用于Excel导出,asyncSize表示开启异步导出的数量,excelPoolBeanName

为异步需要的线程池bean名称。

异步导出会先将Excel文件异步导出到文件系统,用户再从文件系统中下载。获取文件列表的接口为

@Test
public void testExcelPage() {
    Page<ExportToDocumentVo> exportToDocumentVoPage = EasyExcelApi.pageCurrentExportInfo(1, 10);
    testOverLog(null);
}

该接口为静态调用,获取前租户当前登录用户下导出的文件信息。

Excel导入

@EasyExcelImport(resultClass = OverallMaterialImportVo.class, needAsync = true)
public List<OverallMaterialVo> importMaterialWithAnnotation(MultipartFile file, List<OverallMaterialImportVo> importVoList) {
    String testDivision = "xxxxx";
    StopWatch start = TimeWatchUtils.start();
    ZvosBaseExceptionAssert.checkArgument(CollectionUtils.isNotEmpty(importVoList), "导入数据不能为空");
    return afterImportHandler(start, testDivision, importVoList);
}

使用注解@EasyExcelImportresultClass为导入返回的实体listneedAsync用于控制是否开启异步导入,默认fasle

那么使用这个注解的方法格式为:

@EasyExcelImport(resultClass = U.class)
public List<T> functionA(MultipartFile file, List<U> list) {
    //直接使用List<U> list做业务逻辑
}

List<U> list为导出的数据,functionA方法中,注解里面会将导入的Excel读入到入参中List<U> list,开发者只需要将list拿来做业务操作即可。

图解原理

异步导出功能

异步导入功能

导入

如上图,异步操作都是新启一个线程进行导入导出,这样主线程和子线程一部分操作能并行执行,可以效解决超时问题。

代码解析

导出

我们先来看@EasyExcelExport注解定义

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EasyExcelExport {

    /**
     * 当数量超过asyncSize时,系统转为异步导出的方式
     * 默认 1000
     * @return
     */
    int asyncSize() default 1000;

    /**
     * 导出的最大数量
     * 默认10000
     * @return
     */
    int maxSize() default 10000;

    /**
     * 异步导出文件保存时间,默认1小时
     * @return
     */
    int saveTime() default 1;

    String excelPoolBeanName() default StringUtils.EMPTY;
    
}

这里有几个参数

  1. asyncSize:开启异步导出的数量,当要导出的数量超过这个值时,开启异步导出功能。默认1000条。
  2. maxSize:允许导出的最大数量,超过这个数量。系统抛异常提示。默认一万条。
  3. saveTime:下载列表的保存时间,导出信息存到Redis的保留时间,默认1小时。
  4. excelPoolBeanName:异步操作时使用的线程池bean名称,默认为系统自带线程池,也可以用户自定义。

系统默认线程池如下:

@Configuration
@ConditionalOnProperty(name = "basic-service.excel.pool.provider", havingValue = "default")
public class ExcelPoolTask {
    @Bean("scExcelThreadPool")
    public ExecutorService scExcelThreadPool() {
        int cpuNum = Runtime.getRuntime().availableProcessors();
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(1000);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("sc-async-excel-pool-%d").build();
        return new ThreadPoolExecutor(10 * cpuNum, 30 * cpuNum,
                                      40, TimeUnit.MINUTES, workQueue, threadFactory);
    }
}

同步导出和普通的EasyExcel导出没有什么区别,这里主要说明异步导出。

if (size > asyncSize) {
    //异步导出接口...
    String excelPoolBean = annotation.excelPoolBeanName();
    Executor excelThreadPool = null;
    if (StringUtils.isNotBlank(excelPoolBean)) {
        excelThreadPool = (Executor) SpringScContextHolder.getBean(excelPoolBean);
    }
    //生成任务id
    String taskId = SNOW_FLAKE.nextIdStr();
    EasyExcelApi.startExportToDocument(fileName, taskId, saveIime);
    DocumentInfo info = EasyExcelApi.asyncExportExcel2Document(response, list, resultOne.getClass(), fileName, sheetName, excelThreadPool, taskId);
    info.setTaskId(taskId);
    info.setSaveTime(saveIime);
    EasyExcelApi.endExportToDocument(info);
    log.info("异步导出,上传到文件系统,文件信息:【{}】", JsonUtils.toJson(info));
    return;
}

每次异步导出,我们都会生成一个任务,记录在Redis中,记录导出初始状态。导出结束后,更新记录导出结束状态。

 //开始导出
 EasyExcelApi.startExportToDocument(fileName, taskId, saveIime);
 //结束导出
 EasyExcelApi.endExportToDocument(info);

我们来看看代码EasyExcelApi.asyncExportExcel2Document

ByteArrayOutputStream arrayOutputStream = asyncExportExcel(response, list, clazz, fileName, sheetName, taskAsyncExecutor, taskId);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(arrayOutputStream.toByteArray());
MultipartFile mFile = new MockMultipartFile(fileName + ".xlsx", fileName + ".xlsx", ConstantsString.APPLICATION_OCTET_STREAM, byteArrayInputStream);
ScUploadFileResponse upload = documentApiClient.upload(mFile);
String fileId = upload.getFileId();
return documentApiClient.getDocumentInfoByFileId(fileId);

上述代码我们做两件事:

  1. 执行导出
  2. 导出文件上传到文件系统中,并返回文件信息

那么我们看看核心代码asyncExportExcel

这里我使用两个线程。线程1,线程2,线程1将导出的数据推入到队列中

BlockingQueue<List<? extends BaseRowModel>> queue = new ArrayBlockingQueue<>(countSize);

partitions.stream().<Runnable>map(item -> () -> {
    try {
     queue.put(item);
    } catch (Exception e) {
     log.error("导出文件异常:", e);
    } finally {
     countDownLatch.countDown();
    }
    log.info("asyncExportExcel:数据放入队列中,线程名称:【{}】", Thread.currentThread().getName());
}).forEach(taskAsyncExecutor::execute);

采用倒计数器CountDownLatch控制线程的启停。

taskAsyncExecutor.execute(() -> {
    log.info("asyncExportExcel:数据从队列中取出,线程名称:【{}】", Thread.currentThread().getName());
    int sheetNo = 0;
    while (true) {
        List<? extends BaseRowModel> consumerList = null;
        try {
         consumerList = queue.take();
        } catch (InterruptedException e) {
         Thread.interrupted();
        }

        long count = countDownLatch.getCount();
        WriteSheet writeSheet = EasyExcel.writerSheet(sheetNo, sheetName.concat("_").concat(String.valueOf(sheetNo)))
        .head(clazz)
        .registerWriteHandler(new Custemhandler())
        .registerWriteHandler(getStyleStrategy())
        .build();

        excelWriter.write(consumerList, writeSheet);
        sheetNo++;
        takeQueueCountDownLatch.countDown();
        if (count == 0 && queue.size() == 0) {
         break;
        }
        if (countSize / sheetNo == 2) {
            ExportToDocumentVo vo = getExportToDocumentVoByTaskId(taskId);
            vo.setProgress("50");
            createOrUpdateToRedis(vo);
        }
    }
});

上面的线程做的操作为,循环将队列的数据取出,然后写入到Excel文件中,消费的数据量为总数据量的一半时,我们更新下Redis的进度信息

if (countSize / sheetNo == 2) {
    ExportToDocumentVo vo = getExportToDocumentVoByTaskId(taskId);
    vo.setProgress("50");
    createOrUpdateToRedis(vo);
}

导出的主要代码就是这样的了。

导入

导入的代码相对较简短,我们来看看注解定义:

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EasyExcelImport {

    /**
     *
     * 导入导入方式,是否需要异步导入
     * 默认false
     *
     * @return
     */
    boolean needAsync() default false;

    /**
     * 从excel 的第几行读取数据,从去掉标题行算起。
     * 默认从第2行算起
     * @return
     */
    int headRowNumber() default 1;

    /**
     * 异步的线程池
     *
     * @return
     */
    String excelPoolBeanName() default StringUtils.EMPTY;

    //excel解析类
    Class resultClass();

    //excel 监听器
    Class listener() default ImportScExcelListener.class;
}

属性定义说明如下:

  1. headRowNumber:从第几行读取Excel数据,一般第一行默认为标题行,如果headRowNumber值为1,说明是从第2行开始读取数据。这里默认为1,从Excel第二行读取数据。
  2. needAsync:是否开启异步导入,默认不开启。
  3. excelPoolBeanName:开启异步需要使用的线程池bean名称,可自定义,默认使用系统线程池,和导出线程池为同一个。
  4. resultClass:导入到内存后的实体类class。
  5. listener:监听器,可用于导入时的业务校验,异常监听。可自定义监听器,默认ImportScExcelListener

导入代码如下:

if (needAsync) {
 return EasyExcelApi.importExcelAsync(file, point);
} else {
    List list = EasyExcelApi.importExcel(file, resultClass, headRowNumber, listener);
    return point.proceed(new Object[]{file, list});
}

开启异步时,启动了一个线程执行导入操作:

CompletableFuture<Object> future = new CompletableFuture<>();
excelThreadPool.execute(() -> {
    log.info("开始异步导入,当前线程:【{}】", Thread.currentThread().getName());
    List list = importExcel(mFile, resultClass, headRowNumber, listener);
    try {
        Object proceed = point.proceed(new Object[]{file, list});
        future.complete(proceed);
    } catch (Throwable throwable) {
     log.error("导入后执行业务逻辑异常", throwable);
     throwable.printStackTrace();
    }
    log.info("异步导入结束,当前线程:【{}】", Thread.currentThread().getName());
    });
//todo 异步考虑将执行结果信息入库
return future.get();

至此导入导出操作介绍完毕

继续阅读

更多来自我们博客的帖子

如何安装 BuddyPress
由 测试 December 17, 2023
经过差不多一年的开发,BuddyPress 这个基于 WordPress Mu 的 SNS 插件正式版终于发布了。BuddyPress...
阅读更多
Filter如何工作
由 测试 December 17, 2023
在 web.xml...
阅读更多
如何理解CGAffineTransform
由 测试 December 17, 2023
CGAffineTransform A structure for holding an affine transformation matrix. ...
阅读更多