前言
看标题0v0
准备工作
1、建个SpringBoot项目,主要的依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<!-- mybatis-plus-spring-boot3-starter -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-spring-boot3-starter</artifactId>
<version>3.5.7</version>
</dependency>
<!-- mybatis-plus-extension -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-extension</artifactId>
<version>3.5.7</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!--SpringBoot集成druid连接池druid-spring-boot-starter -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.23</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2、配置文件
spring.application.name=MultipleDataInsertCase
# ========================alibaba.druid + mysql8===============
#druid的连接配置貌似有问题,开不了10个连接,导致任务阻塞了,所以我直接注释掉用默认的了
#spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql:///test?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true
spring.datasource.username=root
spring.datasource.password=123456
# ========================mybatisPlus===============
#mybatis-plus.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
mybatis-plus.configuration.map-underscore-to-camel-case=true
mybatis-plus.mapper-locations=classpath:mapper/*.xml
mybatis-plus.type-aliases-package=com.hypo.multipledatainsertcase.entity
# ========================SQL Init===============
spring.sql.init.schema-locations=classpath:db/init.sql
spring.sql.init.mode=always
# ========================Json DateTimeFormat===============
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
logging.level.com.hypo.multipledatainsertcase=info
3、Resource文件夹下创建个db/init.sql
drop table if exists test;
create table test
(
id bigint,
name varchar(100),
age int,
address varchar(100),
mail varchar(100),
phone varchar(100),
primary key (id)
);
4、接着就是一键的代码生成,建议直接MybatisX一把梭,CURD没啥好写的
实体类
@Data
@Builder
@TableName("test")
public class TestEntity {
@TableId
private Long id;
private String name;
private Integer age;
private String address;
private String mail;
private String phone;
}
Mapper层
public interface TestEntityMapper extends BaseMapper<TestEntity> {
void deleteTable();
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.hypo.multipledatainsertcase.mapper.TestEntityMapper">
<resultMap id="BaseResultMap" type="com.hypo.multipledatainsertcase.entity.TestEntity">
<id property="id" column="id" jdbcType="BIGINT"/>
<result property="name" column="name" jdbcType="VARCHAR"/>
<result property="age" column="age" jdbcType="INTEGER"/>
<result property="address" column="address" jdbcType="VARCHAR"/>
<result property="mail" column="mail" jdbcType="VARCHAR"/>
<result property="phone" column="phone" jdbcType="VARCHAR"/>
</resultMap>
<delete id="deleteTable">
delete
from test;
</delete>
</mapper>
Service层
public interface TestEntityService extends IService<TestEntity> {
/**
* 普通插入,在一个事务中
*/
void singleThreadInsert();
/**
* 多线程事务批量插入,正常操作
*/
void moreThreadInsert();
/**
* 多线程事务批量插入,故意出bug,演示要么一起成功,要么全体连坐
*/
void moreThreadInsertFail();
void delete();
}
@Service
public class TestEntityServiceImpl extends ServiceImpl<TestEntityMapper, TestEntity>
implements TestEntityService {
@Resource
private TestEntityMapper testEntityMapper;
private static final int CPU_CORE = Runtime.getRuntime().availableProcessors();
public static List<TestEntity> data = generateData();
private static List<TestEntity> generateData() {
int userSize = 10 * 100000;
List<TestEntity> testEntities = new ArrayList<>(userSize);
for (long i = 1; i <= userSize; i++) {
testEntities.add(TestEntity.builder()
.id(i)
.name("中国用户-" + i)
.age(20)
.phone("123456789")
.address("中国中国中国中国中国中国中国中国中国中国")
.mail("123456789@qq.com")
.build());
}
return testEntities;
}
@Override
@Transactional(rollbackFor = Exception.class)
public void singleThreadInsert() {
//单表同步插入100w
testEntityMapper.insert(data);
}
@Resource
private PlatformTransactionManager platformTransactionManager;
@Override
public void moreThreadInsert() {
int dataList = CPU_CORE +1;
// 分割列表
List<List<TestEntity>> splitLists = splitList(data, dataList);
// 打印每个部分的大小
for (int i = 0; i < splitLists.size(); i++) {
System.out.println("Part " + (i + 1) + " size: " + splitLists.get(i).size());
}
boolean success = MoreThreadTransactionUtils.execute(platformTransactionManager,
splitLists.stream().map(list -> (Runnable) () -> {
testEntityMapper.insert(list);
}).toArray(Runnable[]::new));
if (success) {
System.out.println("插入成功");
} else {
System.out.println("插入失败");
}
}
@Override
public void moreThreadInsertFail() {
AtomicInteger taskCount = new AtomicInteger();
// IO密集型乘2,CPU密集型+1
int dataList = CPU_CORE + 1 ;
// 分割列表
List<List<TestEntity>> splitLists = splitList(data, dataList);
// 打印每个部分的大小
for (int i = 0; i < splitLists.size(); i++) {
System.out.println("Part " + (i + 1) + " size: " + splitLists.get(i).size());
}
boolean success = MoreThreadTransactionUtils.execute(platformTransactionManager,
splitLists.stream().map(list -> (Runnable) () -> {
//最后一个任务抛出异常
if (taskCount.getAndIncrement() == dataList) {
throw new RuntimeException("抛出异常");
}
testEntityMapper.insert(list);
}).toArray(Runnable[]::new));
if (success) {
System.out.println("插入成功");
} else {
System.out.println("插入失败");
}
}
@Override
public void delete() {
testEntityMapper.deleteTable();
}
private static List<List<TestEntity>> splitList(List<TestEntity> list, int numberOfParts) {
List<List<TestEntity>> result = new ArrayList<>();
int totalSize = list.size();
int partSize = totalSize / numberOfParts;
int remainder = totalSize % numberOfParts;
int start = 0;
for (int i = 0; i < numberOfParts; i++) {
int end = start + partSize + (i < remainder ? 1 : 0);
result.add(new ArrayList<>(list.subList(start, end)));
start = end;
}
return result;
}
}
核心工具类
@Slf4j
public class MoreThreadTransactionUtils {
/**
* 多线程事务处理
* 多线程事务批量插入,故意出bug,演示要么一起成功,要么全体连坐
*
* @param platformTransactionManager Spring的事务管理器,用于控制事务的提交和回滚
* @param taskList 一个可变数量的Runnable任务,每个任务代表一个数据库操作
* @return 如果所有任务都成功完成,则返回true;否则返回false
*/
public static boolean execute(PlatformTransactionManager platformTransactionManager, Runnable... taskList) {
if (taskList == null || taskList.length == 0) {
throw new IllegalArgumentException("taskList is empty");
}
// 任务数量
int taskSize = taskList.length;
System.out.println("taskSize: " + taskSize);
// 任务成功数量计数器
AtomicInteger taskSuccessAccount = new AtomicInteger(0);
List<Future<?>> taskFutureList = new ArrayList<>(taskSize);
// 循环屏障,用于让多线程事务要么一起成功,要么全体连坐
CyclicBarrier cyclicBarrier = new CyclicBarrier(taskSize);
int i = 1;
// 定义了一个线程池,线程池核心线程数大小和任务大小必须一致,因为里面的任务都必须同时去执行,否则会死锁
ExecutorService threadPool = Executors.newFixedThreadPool(taskSize);
try {
//使用线程池执行循环处理任务,每个任何会交给线程池中的一个线程执行
for (Runnable runnableThread : taskList) {
final int taskIndex = i;
Future<?> future = threadPool.submit(() -> {
TransactionStatus transactionStatus = null;
try {
// 使用spring编程式事务,开启事务
transactionStatus = platformTransactionManager.getTransaction(new DefaultTransactionAttribute());
log.info("task:{} start", taskIndex);
// 执行任务
runnableThread.run();
// 成功数量+1
taskSuccessAccount.incrementAndGet();
log.info("task:{} 等待提交事务", taskIndex);
} catch (Throwable e) {
log.error("task:{},执行异常,异常原因:{}", taskIndex, e.getMessage());
} finally {
try {
//运行到这cyclicBarrier阻塞发生,当池中所有任务都over才会被唤醒,继续向下走
cyclicBarrier.await();
} catch (Exception e) {
log.error("cyclicBarrier.await error:{}", e.getMessage(), e);
}
}
//根据事务返回的状态确认commit or rollback
if (transactionStatus != null)
{
// 如果所有任务都成功(successAccount的值等于任务总数),则一起提交事务,如果有任何任务失败,则一起回滚事务
if (taskSuccessAccount.get() == taskSize) {
// 成功,提交事务
log.info("task:{} 提交事务", taskIndex);
platformTransactionManager.commit(transactionStatus);
} else {
//失败,回滚事务
log.error("task:{} 回滚事务", taskIndex);
platformTransactionManager.rollback(transactionStatus);
//todo
//出错消息发送到钉钉或者告警短信或者其它专属异常记录数据库,用xxl-job再
//定时重试,进入下一次排期
}
}
});
taskFutureList.add(future);
i++;
}
for (Future<?> future : taskFutureList) {
try
{
future.get();
} catch (Exception e) {
log.error("future.get error:{}", e.getMessage(), e);
}
}
} finally {
//关闭线程池
threadPool.shutdown();
}
//如果所有任务都成功完成,则返回true;否则返回false
return taskSuccessAccount.get() == taskSize;
}
}
实践
写测试用例
@SpringBootTest
@Slf4j
class MultipleDataInsertCaseApplicationTests {
@Resource
private TestEntityService testService;
/**
* 单线程插入,普通插入,在一个事务中
*/
@Test
public void singleThreadInsert() {
//清理数据
this.testService.delete();
long startTime = System.currentTimeMillis();
this.testService.singleThreadInsert();
System.out.println("耗时(ms):" + (System.currentTimeMillis() - startTime));
System.out.println("===============");
System.out.println();
}
/**
* 多线程事务批量插入,正常操作
*/
@Test
public void moreThreadInsert() {
//清理之前已有数据
this.testService.delete();
long startTime = System.currentTimeMillis();
this.testService.moreThreadInsert();
long endTime = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");
System.out.println("===============");
System.out.println();
}
/**
* 多线程事务批量插入,故意出bug,演示要么一起成功,要么全体连坐
*/
@Test
public void moreThreadInsertFail() {
//清理数据
this.testService.delete();
long startTime = System.currentTimeMillis();
this.testService.moreThreadInsertFail();
long endTime = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");
}
}
测试用例执行效果
单线程100w批量插入
耗时:30080ms
多线程100w批量插入(按CPU密集型配置核心线程数)
核心线程数:9
耗时:14837ms
多线程100w批量插入(按IO密集型配置核心线程数)
核心线程数:16
耗时:16760ms
结论
效率明显提升,但是其实可以更快,得出这个数据完全是基于我的机器的CPU、磁盘IO性能,我这随便一跑,CUP和磁盘的IO就拉满了,所以,如果换上性能更高的机器,会有更明显的速度提升。