背景
之前的项目中有一个数据迁移,原来的数据存储在旧的系统,现在系统做了重构,需要迁移到新的系统中,老系统的数据被加工到Excel中了,需要基于Excel实现文件的导入,同时需要避免内存溢出以及性能太低的问题。
问题分析
内存溢出问题
百万级别的数据量的 Excel 文件会非常大,如果全都加载到内存中,可能会导致内存溢出问题。
性能问题
百万级别数据从 Excel 读取并且插入到数据中, 可能会很慢,所以需要考虑性能来问题
错误处理
在文件的读取以及导入过程中,可能会出现各种各样的问题,我们要妥善解决好这些问题。
内存溢出问题
百万级别数据量,如果一次性读取到内存中,肯定是不现实的,那么最好的办法就是基于流式读取的方式进行分批处理。
在技术选型上面,我们选择使用 EasyExcel,特别是针对大数据量和复杂 Excel 文件的处理进行优化。在解析 Excel 的时候,EasyExcel 不会一次性将 Excel 一次性全部加载到内存中,而是从磁盘上面一行行进行读取,逐步进行解析。
性能问题
百万级别的数据量,如果采用单线程进行读取的话,性能会非常非常 low !!!
所以,针对这个问题,我们就需要使用多线程。
使用多线程主要涉及到两个场景:
多线程读取
多线程实现数据插入
这里就涉及到一个生产者—消费者的模式,通过使用多个线程进行读取,然后多个线程实现插入,从而最大限度地提升整体的性能来。
数据的插入,除了借助多线程以外,还可以同时使用数据库的批量插入的功能, 从而提高插入的速度。
错误处理
在文件的读取和数据库写入的过程中,需要解决各种各样的问题,比如数据格式错误、数据不一致、重复数据等问题。
所以我们需要分两步进行一个处理:
对数据进行检查,在开始插入之前将数据的格式问题提前检查好
在插入过程中,需要对异常进行处理
对于异常处理,我们的处理方式有很多,可以进行日志记录,也可以选择进行事务回滚,这个主要根据实际情况进行选择,一般情况下是不建议进行回滚的,直接自动重试,如果重试之后还是不行,则记录日志,再记录日志然后重新插入即可。
并且在这个过程中,需要考虑一下数据重复的问题,需要在 Excel 中某几个字段设置成数据库唯一约束,然后遇到数据冲突的情况,可以先进行处理,处理的方式可以是覆盖、跳过以及报错,这个根据业务的实际情况来确定,一般可以使用跳过+打印日志实现。
技术选型
在大文件的读取方面,EasyExcel更合适,因为他不会像POI一样耗内存,可以大大的减少内存占用。因为他并不会一次性把整个Excel都加载到内存中,而是逐行读取的。
同时考虑使用多线程来读取,这里就需要用到线程池的技术,直接用ExecutorService就行了。
因为还涉及到数据的批量写入,需要依赖mybatis或者mybatis-plus。
整体方案
使用 EasyExcel 实现 Excel 文件的读取,因为他不会一次性将整个 Excel 加载到内存中,是采取逐行读取的方式,并且为了提高并发性能,我们可以将数据先分散到不同的 sheet 中,然后借助线程池,多线程同时读取不同的sheet,在读取的过程中,借助 EasyExcel 的 ReadListener 进行处理。
在处理的过程中,我们并不会每一条数据都操作一次数据库,这样对于数据库的压力太大了,一般我们会设置一个批次,如 2000 条,我们从 Excel 中读取的数据暂时存储到内存中,这里可以使用 List 实现,在读取 2000 条数据,就可以执行一次数据的批量插入,这里可以使用 MyBatis 的批量插入以及 MP 的批量插入功能实现。
MyBatis 实现批量通过插入:https://juejin.cn/post/7016691244973686820
然后在这个过程中,需要考虑一些并发的问题,所以我们这里会使用一些线程安全的队列,比如 ConcurrentLinkedQueue
然后在经过验证以后,读取 100 w 的 Excel 并且插入数据之后,总耗时在 100 秒左右,用时不超过 2 分钟。
具体实现
为了提高并发处理能力,我们将百万级别的数据放到同一个 Excel 的不同 sheet 中, 然后通过使用 EasyExcel 并发读取这些 sheet。
EasyExcel 提供了 ReadListener 接口,允许在读取每一批数据之后自定义进行处理,我们可以基于这个功能来实现文件的分批读取。
依赖添加
<dependencies>
<!-- EasyExcel -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
<version>最新的版本号</version>
</dependency>
<!-- 数据库连接和线程池 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>最新版本号</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
</dependencies>
代码实现
然后实现并发读取多个 sheet 代码:
@Service
public class ExcelImporterService {
@Autowired
private MyDataService myDataService;
public void doImport() {
// Excel文件的路径
String filePath = "users/workspace/excel/test.xlsx";
// 需要读取的sheet数量
int numberOfSheets = 20;
// 创建一个固定大小的线程池,大小与sheet数量相同
ExecutorService executor = Executors.newFixedThreadPool(numberOfSheets);
// 遍历所有sheets
for (int sheetNo = 0; sheetNo < numberOfSheets; sheetNo++) {
// 在Java lambda表达式中使用的变量需要是final
int finalSheetNo = sheetNo;
// 向线程池提交一个任务
executor.submit(() -> {
// 使用EasyExcel读取指定的sheet
EasyExcel.read(filePath, MyDataModel.class, new MyDataModelListener(myDataService))
.sheet(finalSheetNo) // 指定sheet号
.doRead(); // 开始读取操作
});
}
// 启动线程池的关闭序列
executor.shutdown();
// 等待所有任务完成,或者在等待超时前被中断
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
// 如果等待过程中线程被中断,打印异常信息
e.printStackTrace();
}
}
}
这段代码通过创建一个固定大小的线程池来并发读取一个包含多个sheets的Excel文件。每个sheet的读取作为一个单独的任务提交给线程池。
我们在代码中用了一个MyDataModelListener,这个类是ReadListener的一个实现类。当EasyExcel读取每一行数据时,它会自动调用我们传入的这个ReadListener实例的invoke方法。在这个方法中,我们就可以定义如何处理这些数据。
MyDataModelListener还包含doAfterAllAnalysed方法,这个方法在所有数据都读取完毕后被调用。这里可以执行一些清理工作,或处理剩余的数据。
接下来,我们来实现这个 ReadListener:
import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.read.listener.ReadListener;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.List;
// 自定义的ReadListener,用于处理从Excel读取的数据
public class MyDataModelListener implements ReadListener<MyDataModel> {
// 设置批量处理的数据大小
private static final int BATCH_SIZE = 1000;
// 用于暂存读取的数据,直到达到批量大小
private List<MyDataModel> batch = new ArrayList<>();
private MyDataService myDataService;
// 构造函数,注入MyBatis的Mapper
public MyDataModelListener(MyDataService myDataService) {
this.myDataService = myDataService;
}
// 每读取一行数据都会调用此方法
@Override
public void invoke(MyDataModel data, AnalysisContext context) {
//检查数据的合法性及有效性
if (validateData(data)) {
//有效数据添加到list中
batch.add(data);
} else {
// 处理无效数据,例如记录日志或跳过
}
// 当达到批量大小时,处理这批数据
if (batch.size() >= BATCH_SIZE) {
processBatch();
}
}
private boolean validateData(MyDataModel data) {
// 调用mapper方法来检查数据库中是否已存在该数据
int count = myDataService.countByColumn1(data.getColumn1());
// 如果count为0,表示数据不存在,返回true;否则返回false
if(count == 0){
return true;
}
// 在这里实现数据验证逻辑
return false;
}
// 所有数据读取完成后调用此方法
@Override
public void doAfterAllAnalysed(AnalysisContext context) {
// 如果还有未处理的数据,进行处理
if (!batch.isEmpty()) {
processBatch();
}
}
// 处理一批数据的方法,重试次数超过 3,进行异常处理
private void processBatch() {
int retryCount = 0;
// 重试逻辑
while (retryCount < 3) {
try {
// 尝试批量插入
myDataService.batchInsert(batch);
// 清空批量数据,以便下一次批量处理
batch.clear();
break;
} catch (Exception e) {
// 重试计数增加
retryCount++;
// 如果重试3次都失败,记录错误日志
if (retryCount >= 3) {
logError(e, batch);
}
}
}
}
// 记录错误日志的方法
private void logError(Exception e, List<MyDataModel> failedBatch) {
// 在这里实现错误日志记录逻辑
// 可以记录异常信息和导致失败的数据
}
}
@Service
public class MyDataService{
// MyBatis的Mapper,用于数据库操作
@Autowired
private MyDataMapper myDataMapper;
// 使用Spring的事务管理进行批量插入
@Transactional(rollbackFor = Exception.class)
public void batchInsert(List<MyDataModel> batch) {
// 使用MyBatis Mapper进行批量插入
myDataMapper.batchInsert(batch);
}
public int countByColumn1(String column1){
return myDataMapper.countByColumn1(column1);
}
}
通过自定义这个 MyDataModelListener,我们就可以在读取Excel文件的过程中处理数据。
每读取到一条数据之后会把他们放入一个 List,当List中积累到1000条之后,进行一次数据库的批量插入,插入时如果失败了则重试,最后还是失败就打印日志。
这里批量插入,用到了 MyBatis 的批量插入,代码实现如下:
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
@Mapper
public interface MyDataMapper {
void batchInsert(List<MyDataModel> dataList);
int countByColumn1(String column1);
}
mapper.xml 文件如下:
<insert id="batchInsert" parameterType="list">
INSERT INTO test_table_name (column1, column2, ...)
VALUES
<foreach collection="list" item="item" index="index" separator=",">
(#{item.column1}, #{item.column2}, ...)
</foreach>
</insert>
<select id="countByColumn1" resultType="int">
SELECT COUNT(*) FROM your_table WHERE column1 = #{column1}
</select>
基于 EasyExcel + 线程池解决 POI 文件导出溢出问题
背景
在 CRM 后台管理系统中,需要导出 Excel ,但是在处理大数据量的 Excel 文件导出的时候,常用的 Apache POI 库可能因为内存占用太高导致内存的溢出,同时,数据处理的过程中耗时非常久,从而导致用户等待时间过长或者请求超时,为了解决这些问题,采用了 EasyExcel + 线程池的解决方案。
原因探究:为什么 POI 会导致内存溢出?
首先由两个方面的原因,第一个方面就是 Excel 实际上没有我们看的得那么小,其在加载的时候采用了压缩的策略,另外一个方面就是 POI 文件的处理会将整个 Excel 文档加载到内存中,从而导致处理大型文件的时候导致内存溢出。
为什么 Excel 没有我们看起来那么小
我们常见的 Excel 文件其是是一个压缩文件,它是将若干个 XML 格式的纯文本压缩在一起,Excel 就是读取这些压缩文件的信息,最后展现为一个表格文件。
所以,如果我们把xlsx文件的后缀更改为.zip或.rar,再进行解压缩,就能提取出构成Excel的核心源码文件。解压后会发现解压后的文件中有3个文件夹和1个XML格式文件:
_rels 文件夹:里面的数据是一些基础的配置信息,比如 workbook 文件的位置等信息
docProps 文件夹:主要存储的是 sheet 的元信息,如果需要创建和编辑 sheet,一般修改的都是这个文件的内容
xl 文件夹:这个是最重要的一个文件夹,里面存放了 sheet 中的数据、行和列的格式、单元格格式、sheet 的配置信息等
综上所示,我们可以发现我们处理的 excel 文件实际上是一个经过高度压缩的文件格式,背后有许多的文件,所以看到的一个文件可能不止 2M,实际上会很大。
也就是说,我们在使用POI 处理 Excel 文件的时候,实际的大小可能远远大于我们所看见的大小,也就解释了为什么处理的文件只有 100 MB,最后实际内存占用却达到了 1GB 甚至更大。
POI 文件溢出的原理
这里我们拿一个示例:
import org.apache.poi.ss.usermodel.*;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
public class ExcelReadTest {
public static void main(String[] args) {
// 指定要读取的文件路径
String filename = "example.xlsx";
try (FileInputStream fileInputStream = new FileInputStream(new File(filename))) {
// 创建工作簿对象
Workbook workbook = new XSSFWorkbook(fileInputStream);
// 获取第一个工作表
Sheet sheet = workbook.getSheetAt(0);
// 遍历所有行
for (Row row : sheet) {
// 遍历所有单元格
for (Cell cell : row) {
Thread.sleep(100);
// 根据不同数据类型处理数据
switch (cell.getCellType()) {
case STRING:
System.out.print(cell.getStringCellValue() + "\t");
break;
case NUMERIC:
if (DateUtil.isCellDateFormatted(cell)) {
System.out.print(cell.getDateCellValue() + "\t");
} else {
System.out.print(cell.getNumericCellValue() + "\t");
}
break;
case BOOLEAN:
System.out.print(cell.getBooleanCellValue() + "\t");
break;
case FORMULA:
System.out.print(cell.getCellFormula() + "\t");
break;
default:
System.out.print(" ");
}
}
System.out.println();
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
这里面使用了一个关键的 XSSFWorkbook 类:
public XSSFWorkbook(InputStream is) throws IOException {
this(PackageHelper.open(is));
}
public static OPCPackage open(InputStream is) throws IOException {
try {
return OPCPackage.open(is);
} catch (InvalidFormatException e){
throw new POIXMLException(e);
}
}
最后会调用 OPCPackage.open 方法
public static OPCPackage open(InputStream in) throws InvalidFormatException,
IOException {
OPCPackage pack = new ZipPackage(in, PackageAccess.READ_WRITE);
try {
if (pack.partList == null) {
pack.getParts();
}
} catch (InvalidFormatException | RuntimeException e) {
IOUtils.closeQuietly(pack);
throw e;
}
return pack;
}
然后我们可以看一下这个方法相关的注释:
/**
* Open a package.
*
* Note - uses quite a bit more memory than {@link #open(String)}, which
* doesn't need to hold the whole zip file in memory, and can take advantage
* of native methods
*
* @param in
* The InputStream to read the package from
* @return A PackageBase object
*
* @throws InvalidFormatException
* Throws if the specified file exist and is not valid.
* @throws IOException If reading the stream fails
*/
其中说了,这个方法会将整个压缩文件都加载到内存中,显而易见,就是将 Excel 的文档格式全都加载到内存中了,所以在处理小型文件的时候可能没什么影响,一旦处理大型文件,内存肯定是会溢出的。
技术选型
Excel 的导出有很多种方案,包括 POI、EasyExcel 以及 Hutool 等,市面上比较常用的解决方案主要是 POI 和 EasyExcel,然后在处理大文件这个方面,EasyExcel 更加适合一点,其内存占用更少,而且在处理大文件方面,EasyExcel 更加适合一些。
在文件导出的过程中,可以使用异步的方式去进行,用户不用一直等待,在异步文件生成之后,把文件上床到云存储中,再通知用户去下载就可以了。
然后云存储这块我们可以自由选择,这里选择使用阿里云 OSS,线程池异步处理采用 @Async
用户通知可以选择邮件通知的方式,这里使用 Spring Main 邮件发送。
具体实现
入口层
入口是一个Controller,主要接收用户的文件导出请求。
@RestController
@RequestMapping("/export")
public class DataExportController {
@Autowired
private ExcelExportService exportService;
@GetMapping("/data")
public ResponseEntity<String> exportData() {
List<DataModel> data = fetchData();
String fileUrl = exportService.exportDataAsync(data);
return ResponseEntity.ok("导出任务开始,文件生成后会通知您下载链接");
}
private List<DataModel> fetchData() {
// 获取需要导出的数据
}
}
然后在文件获取的时候,可能需要一些具体的数据获取,这个流程这里选就没有细说,可以结合业务场景进行实际的评估。
导出业务实现
这里主要有三个方面的内容:
使用 EasyExcel 生成文件
OSS 上传生成后的文件
Spring Mail 给用户发通知
@Service
public class ExcelExportService {
@Async("exportExecutor")
public String exportDataAsync(List<DataModel> data) {
// 生成 Excel 文件并获取 InputStream
InputStream fileContent = generateExcelFile(data);
String fileName = "data_" + System.currentTimeMillis() + ".xlsx";
// 上传到 OSS
String fileUrl = ossService.uploadFile(fileName, fileContent);
// 发送邮件
emailService.sendEmail(data.getUserEmail(), "文件导出通知", "您的文件已导出,下载链接: " + fileUrl);
return fileUrl;
}
private InputStream generateExcelFile(List<DataModel> data) {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try {
ExcelWriterBuilder writerBuilder = EasyExcel.write(outputStream, DataModel.class);
writerBuilder.sheet("Data").doWrite(data);
} catch (Exception e) {
// 处理异常
}
return new ByteArrayInputStream(outputStream.toByteArray());
}
// DataModel 类定义
public static class DataModel {
//省略参数及setter/getter
}
}
这里还用到了一个线程池的技术:
@Configuration
@EnableAsync
public class AsyncExecutorConfig {
@Bean("exportExecutor")
public Executor exportExecutor() {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("registerSuccessExecutor-%d").build();
ExecutorService executorService = new ThreadPoolExecutor(10, 20,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
return executorService;
}
}
OSS上传服务部分代码实现如下,依赖阿里云OSS的API进行文件上传:
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.model.PutObjectRequest;
import java.io.InputStream;
import java.net.URL;
import java.util.Date;
public class OssService {
private String endpoint = "<OSS_ENDPOINT>";
private String accessKeyId = "<ACCESS_KEY_ID>";
private String accessKeySecret = "<ACCESS_KEY_SECRET>";
private String bucketName = "<BUCKET_NAME>";
public String uploadFile(String fileName, InputStream fileContent) {
OSS ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
try {
ossClient.putObject(new PutObjectRequest(bucketName, fileName, fileContent));
// 设置URL过期时间为1小时
Date expiration = new Date(System.currentTimeMillis() + 3600 * 1000);
URL url = ossClient.generatePresignedUrl(bucketName, fileName, expiration);
return url.toString();
} finally {
if (ossClient != null) {
ossClient.shutdown();
}
}
}
}
邮件发送代码:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.stereotype.Service;
@Service
public class EmailNotificationService {
@Autowired
private JavaMailSender mailSender;
public void sendEmail(String toAddress, String subject, String body) {
SimpleMailMessage message = new SimpleMailMessage();
message.setFrom("noreply@example.com");
message.setTo(toAddress);
message.setSubject(subject);
message.setText(body);
mailSender.send(message);
}
}
配置文件参数
主要配置一下 Spring Mail 的配置信息:
spring.mail.host=192.168.1.11
spring.mail.port=8907
spring.mail.username=user@gmail.com
spring.mail.password=123456
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true