java百万级mysql数据导入excel(多线程)

java百万级mysql数据导入excel(多线程)

文章目录

  • java百万级mysql数据导入excel(多线程)
      • navicat
      • poi
      • easyExcel
        • 选择原因
        • 快速上手
          • 单线程写入excel
          • 多线程写入excel
          • 单线程读出excel
          • 多线程读出excel
        • 分析性能
      • 总结

大家好,最近写项目有一个mysql导入excel的需求。
那么今天从下面三个方面来实现这个需求,分别是白嫖navicat功能,poi,easyExcel

其中easyExcel方案会使用多线程加快导入导出效率

navicat

如果不是程序需求的话,可以通过navicat提供的《导出向导》实现。

这里可以选择各种格式的导入导出

选择要导出的表,并指定保存路径,点击《下一步》

选择要导出的字段,一般默认为全部字段

然后一直无脑下一步,点击开始,查看xlxs文件。

那如果想用程序语言在项目中如何实现呢?

poi

POI提供API给Java程序对Microsoft Office格式档案读和写的功能

官方地址:https://poi.apache.org/overview.html

API:https://poi.apache.org/apidocs/4.1/

maven引入依赖包

<!-- https://mvnrepository.com/artifact/org.apache.poi/poi -->
<dependency>
    <groupId>org.apache.poi</groupId>
    <artifactId>poi</artifactId>
    <version>4.1.2</version>
</dependency>

tips:HSSFWorkbook类 是poi对Microsoft Excel 2003 XLS的支持。也就是.xls文件,而现在 XSSFWorkbook类对.xlxs 进行支持。

所以一般选择4.12版本或之后。

由于easyExcel提供了更强大,更灵活的execl导入导出功能,基本项目里不怎么用poi,所以这里给出一个jdbc+poi来实现的源码。本人已经调试过没有问题。读者可以跟着源码debug一遍,不再赘述。

https://github.com/LHJ-8023/mysql_excel_utils

easyExcel

选择原因

**第一点:内存消耗提升。**由于poi在Excel解压缩时内存消耗依然很大,easyExcel重写底层excel解析,一个3M的excel用POI sax解析依然需要100M左右内存直接降为几M。

第二点:简单易用。easyExcel在上层做了模型转换的封装,让使用者更加简单方便

最近正在对easyExcel源码解析,可以关注我等待更新

github地址:https://github.com/alibaba/easyexcel

文档地址:https://easyexcel.opensource.alibaba.com/docs/current/

快速上手
单线程写入excel

这里我将UserTest这个domain,通过它的mapper和mabatis的分页插件,按照pageSize=10000来分页查出,并将其write到一个.xlsx的一个个sheet中。

代码如下:

@SpringBootTest
public class WriteTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(WriteTest.class);
    @Resource
    private UserTestMapper userTestMapper;
    @Test
    void simpleWrite() throws IOException {
        //单线程插入,每一页是一个sheet插入到一个.xlsx文件中
        String fileName = "user.xlsx";
        OutputStream outputStream = new FileOutputStream(fileName);
        int pageSize=10000;
        int pageNum=1;
        List<UserTest> data;
        int i=0;
        long current=System.currentTimeMillis();
        try (ExcelWriter excelWriter = EasyExcel.write(outputStream, UserTest.class).build()) {
            do {
                // 每次都要创建writeSheet 这里注意必须指定sheetNo 而且sheetName必须不一样
                WriteSheet writeSheet = EasyExcel.writerSheet(i, "模板" + i).build();
                // 分页去数据库查询数据 这里可以去数据库查询每一页的数据
                PageHelper.startPage(pageNum,pageSize);
                data = userTestMapper.selectByExample(new UserTestExample());
                excelWriter.write(data, writeSheet);
                pageNum++;
                i++;
            }while (data.size()>0);
        }
        long end=System.currentTimeMillis();
        System.out.println("耗时:"+(end-current)+"ms");//10w => 2180ms 100w=>23550ms
        outputStream.close();
    }

结果:

注意这是在@SpringBootTest下。选择ioc注入单例mapper的方式来直接进行操作

这里使用PageHelper为mybatis分页插件

多线程写入excel

由于标题百万数据的量级,若单线程去消耗io速度太慢,可以通过多线程让cpu参与以加快性能。

思路:一百万条数据,分为100个线程去同时分页查,每一页10000条(也就是每个sheet一万条)。

然后等待所有线程查询完毕后,在main线程中调用easyExcel的api将其写入。

流程图:

在这里插入图片描述

代码:

    @Test
    void SimpleWriteByThread() throws IOException, InterruptedException {
        /**
         * 多线程插入:开启100个进程去同时分页查,然后放到一个map中,阻塞起来,然后main去遍历这个map,然后遍历
         */
        long current=System.currentTimeMillis();
        String fileName = "user.xlsx";
        OutputStream outputStream = new FileOutputStream(fileName);
        //记录<分页页数,对应数据>的map
        Map<Integer, List<UserTest>> map = new HashMap<>();
        //定义一个线程执行Service,100个固定的线程池。
        ExecutorService executorService = Executors.newFixedThreadPool(100);
        long count = userTestMapper.countByExample(new UserTestExample());
        int pages=100;
        int pageSize= (int) (count/pages);
        //开启一个countDownLatch计数器,等到各线程执行完成后再执行main线程
        CountDownLatch countDownLatch = new CountDownLatch(pages);
        for (int i = 0; i < pages; i++) {
            int currentI=i;
            executorService.submit(() -> {
                //每个线程分页查,并将data put到map中
                PageHelper.startPage(currentI+1,pageSize);
                List<UserTest> userTests = userTestMapper.selectByExample(new UserTestExample());
                map.put(currentI,userTests);
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        //main唤醒,遍历map,将map的key作为sheet数,将value插入各sheet
        try (ExcelWriter excelWriter = EasyExcel.write(outputStream, UserTest.class).build()){
            for (Map.Entry<Integer,List<UserTest>> entry : map.entrySet()){
                Integer pageNum = entry.getKey();
                List<UserTest> pageData = entry.getValue();
                WriteSheet writeSheet = EasyExcel.writerSheet(pageNum, "模板" + pageNum).build();
                excelWriter.write(pageData, writeSheet);
            }
        }
        long end=System.currentTimeMillis();
        System.out.println("耗时:"+(end-current)+"ms");//10w => 2345ms 100w => 8809ms
        outputStream.close();
    }

Q:为什么每个线程分页查出来后就用EasyExcel.writerSheet存入呢?用ThreadLocal也行啊

A:easyExcel的bug,不能多线程异步同时写入execl

https://github.com/alibaba/easyexcel/issues/3550

结果:上图一致。

单线程读出excel

由于read操作在本人项目没有体现,这里只给出简单实现。

EasyExcel采用监听机制,

当read到一条数据时,会调用invoke方法,在这里你可以进行数据库和逻辑操作,

当一个sheet读完后,会调用doAfterAllAnalysed方法

这里选择单线程批量插入。

代码:

@Test
void read1(){
    long startTime = System.currentTimeMillis();
    EasyExcel.read(fileName, UserTest.class, new UserTestListener(userTestMapper)).sheet().doRead();
    long endTime = System.currentTimeMillis();
    long duration = endTime - startTime;
    LOGGER.info("Execution time of read1 method: {} milliseconds", duration);
}
//UserTestListener.java

@Slf4j
public class UserTestListener<U> implements ReadListener<UserTest> {

    /**
     * 每隔x条存储数据库,实际使用中可以100条,然后清理list ,方便内存回收
     */
    private static final int BATCH_COUNT = 1000;
    /**
     * 缓存的数据
     */
    private ThreadLocal<List<UserTest>> cachedDataList = ThreadLocal.withInitial(ArrayList::new);
    private ExecutorService executorService = Executors.newSingleThreadExecutor();

    /**
     * 假设这个是一个DAO,当然有业务逻辑这个也可以是一个service。当然如果不用存储这个对象没用。
     */
    private UserTestMapper userTestMapper;
    private static AtomicInteger count=new AtomicInteger(1);

    /**
     * 如果使用了spring,请使用这个构造方法。每次创建Listener的时候需要把spring管理的类传进来
     *
     * @param userTestMapper
     */
    public UserTestListener(UserTestMapper userTestMapper) {
        this.userTestMapper = userTestMapper;
    }

    @Override
    public void extra(CellExtra extra, AnalysisContext context) {
        ReadListener.super.extra(extra, context);
    }

    /**
     * 所有数据解析完成了 都会来调用
     *
     * @param context
     */
    @Override
    public void doAfterAllAnalysed(AnalysisContext context) {
        saveData();
        log.info("所有数据解析完成!");
    }

    @Override
    public boolean hasNext(AnalysisContext context) {
        return ReadListener.super.hasNext(context);
    }

    /**
     * 加上存储数据库
     */
    private void saveData() {
        if(cachedDataList.get().size()!=0){
            userTestMapper.saveBatchByNative(cachedDataList.get());
            log.info("第"+count.incrementAndGet()+"次插入:"+cachedDataList.get().size()+"条数据");
        }
        cachedDataList.get().clear();
    }

    @Override
    public void onException(Exception exception, AnalysisContext context) throws Exception {
        ReadListener.super.onException(exception, context);
    }

    @Override
    public void invokeHead(Map<Integer, ReadCellData<?>> headMap, AnalysisContext context) {
        ReadListener.super.invokeHead(headMap, context);
    }

    @Override
    public void invoke(UserTest userTest, AnalysisContext analysisContext) {
        log.info("解析到一条数据:{}", JSON.toJSONString(userTest));
        cachedDataList.get().add(userTest);
        if (cachedDataList.get().size() >= BATCH_COUNT) {
            saveData();
//            asyncSaveData();

        }
    }
}
多线程读出excel

在main线程中,创建100个线程去进行read操作,每个线程都会经历下面的流程:

解析到一条数据 => 达到batchSize后存入数据库 => 解析完成 => 输出日志

实例代码如下:

    @Test
    void read2() throws InterruptedException {
        ArrayList<Callable<Object>> tasks = new ArrayList<>();
        ExecutorService executorService = Executors.newFixedThreadPool(100);
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            tasks.add(()->{
                EasyExcel.read(fileName, UserTest.class, new UserTestListener(userTestMapper)).sheet(finalI).doRead();
                return null;
            });
        }
        executorService.invokeAll(tasks);
        System.out.println("read2 main线程结束");
    }

如果想再性能优化,可以选择在“达到batchSize后存入数据库 ”这里想办法

比如创建的这100个子线程中,每一个都会解析 => 保存 => 然后再解析,一直到解析完成。

那其实可以在解析成功后,在这100个子线程中再各自去创建1个子线程去专门执行存入数据库的操作,这样效率最快。

实例代码如下:

//userTestListener.java中定义
    private void asyncSaveData() {
        if(cachedDataList.get().size()!=0){
            executorService.execute(new saveTask(cachedDataList.get(),this));
            cachedDataList.get().clear();
        }
    }
    static class saveTask implements Runnable{
        private List<UserTest> userTests;
        private UserTestListener<UserTest> userTestListener;

        public saveTask(List<UserTest> userTests, UserTestListener userTestListener) {
            this.userTests = userTests;
            this.userTestListener = userTestListener;
        }

        @Override
        public void run() {
            userTestListener.userTestMapper.saveBatchByNative(userTests);
            log.info("第"+count.incrementAndGet()+"次插入:"+userTests.size()+"条数据");
        }
    }

注意:这样的效果不一定就好到哪里去,和机器cpu核心数和操作系统有关,但我实测是加快30%。

注意:对于上述定义的cachedDataList务必使用ThreadLocal包装,否则各线程会对其数据污染。

分析性能

这里给出笔者在插入单表100万数据导入导出的方案耗时记录
在这里插入图片描述

总结

本文介绍了3种mysql表导出excel的方式

  • 如果不需要写程序实现则使用现成数据库连接工具,如navicat的导出功能
  • 如果想理解excel和mysql表底层数据结构可以使用poi去实现(毕竟easyExcel的底层很多也是poi的东西
  • 如果企业项目等建议使用easyExcel,开源,内存占用小,也较安全。