前言

看标题0v0

准备工作

1、建个SpringBoot项目,主要的依赖:

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <!-- mybatis-plus-spring-boot3-starter -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-spring-boot3-starter</artifactId>
            <version>3.5.7</version>
        </dependency>
        <!-- mybatis-plus-extension -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-extension</artifactId>
            <version>3.5.7</version>
        </dependency>

        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <scope>runtime</scope>
        </dependency>
        <!--SpringBoot集成druid连接池druid-spring-boot-starter -->
        <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid-spring-boot-starter</artifactId>
        <version>1.2.23</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

2、配置文件

spring.application.name=MultipleDataInsertCase
# ========================alibaba.druid + mysql8===============
#druid的连接配置貌似有问题,开不了10个连接,导致任务阻塞了,所以我直接注释掉用默认的了
#spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql:///test?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true
spring.datasource.username=root
spring.datasource.password=123456

# ========================mybatisPlus===============
#mybatis-plus.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
mybatis-plus.configuration.map-underscore-to-camel-case=true
mybatis-plus.mapper-locations=classpath:mapper/*.xml
mybatis-plus.type-aliases-package=com.hypo.multipledatainsertcase.entity

# ========================SQL Init===============
spring.sql.init.schema-locations=classpath:db/init.sql
spring.sql.init.mode=always

# ========================Json DateTimeFormat===============
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8

logging.level.com.hypo.multipledatainsertcase=info

3、Resource文件夹下创建个db/init.sql

drop table if exists test;
create table test
(
    id  bigint,
    name varchar(100),
    age  int,
    address varchar(100),
    mail varchar(100),
    phone varchar(100),
    primary key (id)
);

4、接着就是一键的代码生成,建议直接MybatisX一把梭,CURD没啥好写的

实体类

@Data
@Builder
@TableName("test")
public class TestEntity {

    @TableId
    private Long id;

    private String name;

    private Integer age;

    private String address;

    private String mail;

    private String phone;
}

Mapper层

public interface TestEntityMapper extends BaseMapper<TestEntity> {

    void deleteTable();
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.hypo.multipledatainsertcase.mapper.TestEntityMapper">

    <resultMap id="BaseResultMap" type="com.hypo.multipledatainsertcase.entity.TestEntity">
            <id property="id" column="id" jdbcType="BIGINT"/>
            <result property="name" column="name" jdbcType="VARCHAR"/>
            <result property="age" column="age" jdbcType="INTEGER"/>
            <result property="address" column="address" jdbcType="VARCHAR"/>
            <result property="mail" column="mail" jdbcType="VARCHAR"/>
            <result property="phone" column="phone" jdbcType="VARCHAR"/>
    </resultMap>
    <delete id="deleteTable">
        delete
        from test;
    </delete>
</mapper>

Service层

public interface TestEntityService extends IService<TestEntity> {
    /**
     * 普通插入,在一个事务中
     */
    void singleThreadInsert();

    /**
     * 多线程事务批量插入,正常操作
     */
    void moreThreadInsert();

    /**
     * 多线程事务批量插入,故意出bug,演示要么一起成功,要么全体连坐
     */
    void moreThreadInsertFail();

    void delete();
}
@Service
public class TestEntityServiceImpl extends ServiceImpl<TestEntityMapper, TestEntity>
        implements TestEntityService {

    @Resource
    private TestEntityMapper testEntityMapper;

    private static final int CPU_CORE = Runtime.getRuntime().availableProcessors();

    public static List<TestEntity> data = generateData();

    private static List<TestEntity> generateData() {
        int userSize = 10 * 100000;
        List<TestEntity> testEntities = new ArrayList<>(userSize);
        for (long i = 1; i <= userSize; i++) {
            testEntities.add(TestEntity.builder()
                    .id(i)
                    .name("中国用户-" + i)
                    .age(20)
                    .phone("123456789")
                    .address("中国中国中国中国中国中国中国中国中国中国")
                    .mail("123456789@qq.com")
                    .build());
        }
        return testEntities;
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void singleThreadInsert() {
        //单表同步插入100w
        testEntityMapper.insert(data);
    }

    @Resource
    private PlatformTransactionManager platformTransactionManager;

    @Override
    public void moreThreadInsert() {
        int dataList = CPU_CORE +1;
        // 分割列表
        List<List<TestEntity>> splitLists = splitList(data, dataList);

        // 打印每个部分的大小
        for (int i = 0; i < splitLists.size(); i++) {
            System.out.println("Part " + (i + 1) + " size: " + splitLists.get(i).size());
        }
        boolean success = MoreThreadTransactionUtils.execute(platformTransactionManager,
                splitLists.stream().map(list -> (Runnable) () -> {
                    testEntityMapper.insert(list);
                }).toArray(Runnable[]::new));

        if (success) {
            System.out.println("插入成功");
        } else {
            System.out.println("插入失败");
        }
    }

    @Override
    public void moreThreadInsertFail() {

        AtomicInteger taskCount = new AtomicInteger();
        // IO密集型乘2,CPU密集型+1
        int dataList = CPU_CORE + 1 ;
        // 分割列表
        List<List<TestEntity>> splitLists = splitList(data, dataList);

        // 打印每个部分的大小
        for (int i = 0; i < splitLists.size(); i++) {
            System.out.println("Part " + (i + 1) + " size: " + splitLists.get(i).size());
        }
        boolean success = MoreThreadTransactionUtils.execute(platformTransactionManager,
                splitLists.stream().map(list -> (Runnable) () -> {
                    //最后一个任务抛出异常
                    if (taskCount.getAndIncrement() == dataList) {
                        throw new RuntimeException("抛出异常");
                    }
                    testEntityMapper.insert(list);
                }).toArray(Runnable[]::new));

        if (success) {
            System.out.println("插入成功");
        } else {
            System.out.println("插入失败");
        }
    }

    @Override
    public void delete() {
        testEntityMapper.deleteTable();
    }

    private static List<List<TestEntity>> splitList(List<TestEntity> list, int numberOfParts) {
        List<List<TestEntity>> result = new ArrayList<>();
        int totalSize = list.size();
        int partSize = totalSize / numberOfParts;
        int remainder = totalSize % numberOfParts;

        int start = 0;
        for (int i = 0; i < numberOfParts; i++) {
            int end = start + partSize + (i < remainder ? 1 : 0);
            result.add(new ArrayList<>(list.subList(start, end)));
            start = end;
        }

        return result;
    }
}

核心工具类

@Slf4j
public class MoreThreadTransactionUtils {


    /**
     * 多线程事务处理
     * 多线程事务批量插入,故意出bug,演示要么一起成功,要么全体连坐
     *
     * @param platformTransactionManager Spring的事务管理器,用于控制事务的提交和回滚
     * @param taskList                   一个可变数量的Runnable任务,每个任务代表一个数据库操作
     * @return 如果所有任务都成功完成,则返回true;否则返回false
     */
    public static boolean execute(PlatformTransactionManager platformTransactionManager, Runnable... taskList) {
        if (taskList == null || taskList.length == 0) {
            throw new IllegalArgumentException("taskList is empty");
        }

        // 任务数量
        int taskSize = taskList.length;
        System.out.println("taskSize: " + taskSize);
        // 任务成功数量计数器
        AtomicInteger taskSuccessAccount = new AtomicInteger(0);

        List<Future<?>> taskFutureList = new ArrayList<>(taskSize);

        // 循环屏障,用于让多线程事务要么一起成功,要么全体连坐
        CyclicBarrier cyclicBarrier = new CyclicBarrier(taskSize);
        int i = 1;
        // 定义了一个线程池,线程池核心线程数大小和任务大小必须一致,因为里面的任务都必须同时去执行,否则会死锁
        ExecutorService threadPool = Executors.newFixedThreadPool(taskSize);

        try {
            //使用线程池执行循环处理任务,每个任何会交给线程池中的一个线程执行
            for (Runnable runnableThread : taskList) {
                final int taskIndex = i;

                Future<?> future = threadPool.submit(() -> {
                    TransactionStatus transactionStatus = null;
                    try {
                        // 使用spring编程式事务,开启事务
                        transactionStatus = platformTransactionManager.getTransaction(new DefaultTransactionAttribute());
                        log.info("task:{} start", taskIndex);
                        // 执行任务
                        runnableThread.run();
                        // 成功数量+1
                        taskSuccessAccount.incrementAndGet();
                        log.info("task:{} 等待提交事务", taskIndex);
                    } catch (Throwable e) {
                        log.error("task:{},执行异常,异常原因:{}", taskIndex, e.getMessage());
                    } finally {
                        try {
                            //运行到这cyclicBarrier阻塞发生,当池中所有任务都over才会被唤醒,继续向下走
                            cyclicBarrier.await();
                        } catch (Exception e) {
                            log.error("cyclicBarrier.await error:{}", e.getMessage(), e);
                        }
                    }

                    //根据事务返回的状态确认commit or  rollback
                    if (transactionStatus != null)
                    {
                        // 如果所有任务都成功(successAccount的值等于任务总数),则一起提交事务,如果有任何任务失败,则一起回滚事务
                        if (taskSuccessAccount.get() == taskSize) {
                            // 成功,提交事务
                            log.info("task:{} 提交事务", taskIndex);
                            platformTransactionManager.commit(transactionStatus);
                        } else {
                            //失败,回滚事务
                            log.error("task:{} 回滚事务", taskIndex);
                            platformTransactionManager.rollback(transactionStatus);

                            //todo
                            //出错消息发送到钉钉或者告警短信或者其它专属异常记录数据库,用xxl-job再
                            //定时重试,进入下一次排期
                        }
                    }
                });


                taskFutureList.add(future);

                i++;
            }

            for (Future<?> future : taskFutureList) {
                try
                {
                    future.get();
                } catch (Exception e) {
                    log.error("future.get error:{}", e.getMessage(), e);
                }
            }
        } finally {
            //关闭线程池
            threadPool.shutdown();
        }

        //如果所有任务都成功完成,则返回true;否则返回false
        return taskSuccessAccount.get() == taskSize;
    }

}

实践

写测试用例

@SpringBootTest
@Slf4j
class MultipleDataInsertCaseApplicationTests {

    @Resource
    private TestEntityService testService;


    /**
     * 单线程插入,普通插入,在一个事务中
     */
    @Test
    public void singleThreadInsert() {
        //清理数据
        this.testService.delete();
        long startTime = System.currentTimeMillis();
        this.testService.singleThreadInsert();
        System.out.println("耗时(ms):" + (System.currentTimeMillis() - startTime));
        System.out.println("===============");
        System.out.println();
    }

    /**
     * 多线程事务批量插入,正常操作
     */
    @Test
    public void moreThreadInsert() {

        //清理之前已有数据
        this.testService.delete();
        long startTime = System.currentTimeMillis();
        this.testService.moreThreadInsert();
        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");
        System.out.println("===============");
        System.out.println();
    }

    /**
     * 多线程事务批量插入,故意出bug,演示要么一起成功,要么全体连坐
     */
    @Test
    public void moreThreadInsertFail() {
        //清理数据
        this.testService.delete();
        long startTime = System.currentTimeMillis();
        this.testService.moreThreadInsertFail();
        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");
    }
}

测试用例执行效果

单线程100w批量插入

耗时:30080ms

多线程100w批量插入(按CPU密集型配置核心线程数)

核心线程数:9

耗时:14837ms

多线程100w批量插入(按IO密集型配置核心线程数)

核心线程数:16

耗时:16760ms

结论

效率明显提升,但是其实可以更快,得出这个数据完全是基于我的机器的CPU、磁盘IO性能,我这随便一跑,CUP和磁盘的IO就拉满了,所以,如果换上性能更高的机器,会有更明显的速度提升。