前言

在写下这篇文章的半个月前,我还在不断的反思自己的学习方式,质问自己为什么学不好多线程,为什么用不熟多线程。这个困扰我非常久的问题一直都没有得到解决,我刷过sgg、heima的线上课程,看过各种权威书籍,但是回过头来,我觉得自己完全没有基于这些知识梳理出一个可观测的知识体系。直到最近,我尝试从另外一个角度去学习Java的多线程:Java多线程的演化进程。我询问了GPT、搜索了很多相关文章,从Java的第一个发行版本按图索骥,每一个版本为多线程提供了什么内容,我感觉到前所未有的清晰。嗨嗨嗨~口水话也不想说太多,我们直接进入正文吧。

2022年09月22日,JDK19发布了,此版本最大的亮点就是支持虚拟线程,从此轻量级线程家族再添一员大将。虚拟线程使JVM摆脱了通过操作系统调度线程的束缚,由JVM自身调度线程。以此为引子,我们通过下列表格,了解一下各个发行版本的JDK为多线程提供了什么(虽然我们开头提到了JDK19,但是我们还是以截止到JDK8的发行版入手,因为8以后的版本确实用的不多):

版本

简述

JDK1.0

1.0作为Java的第一个版本,原生的支持了多线程。提供了java.lang.Thread 类和 Runnable 接口为基础的多线程编程模型奠定了框架。此时的多线程模型比较简单,开发者需要手动处理线程的创建、管理、和同步(Synchronized、Volatile、Object.wait、Object.notify)

JDK1.2

1.2提供了线程池的雏形,虽然 JDK 1.2 没有提供线程池的标准实现,但开发者开始意识到手动创建和管理线程的成本较高,于是很多开发者自定义线程池模型以提升性能;另外,ThreadLocal的引入,提供了线程局部存储,使变量在每个线程中隔离,避免了多个线程访问共享数据时的冲突。除此之外,Collections工具类的引入,为集合的线程安全提供了一种解决方案。

JDK1.4

虽然 JDK 1.4 并未对多线程进行重大改革,但它引入了NIO,允许异步和非阻塞操作,这对于可扩展的多线程网络应用程序至关重要。

JDK5

JSR133:JSR 133 重新明确了 Java 内存模型

JSR166:这是 Java 并发模型的一个重大转折点,Java并发模型的飞跃!Doug Lea大神横空出世!Java原生提供的不好用,我自己写一个!java.util.concurrent 包的引入使得多线程编程更加易用和高效。

JDK6

JDK6,对Synchronized底层的实现进行了优化,并提供了Fork/Join 框架的雏形。

JDK7

①Fork/Join 框架的引入专门用于任务递归拆分的并行处理,特别适用于 CPU 密集型任务的处理。这一框架允许将大任务分解成子任务,并通过 ForkJoinPool 并发处理,充分利用多核处理器。

②提供了新的线程协调工具Phaser,这是一个用于线程协同的高级工具,比 CyclicBarrier 更加灵活,支持多个阶段的并发任务协调

③对Locks框架进行了增强,使得 ReentrantLock 更加灵活和强大

JDK8

①JDK 8 引入了 CompletableFuture,为异步编程提供了强大的支持,支持非阻塞式的并发编程模型。开发者可以更容易地编写响应式和异步流式处理程序。

②Stream API的引入。虽然不是直接的并发工具,Stream API 提供了便捷的并行流处理支持,开发者可以通过 parallel() 轻松实现数据的并行化处理。

③引入了新的锁机制 StampedLock,它是一种更灵活和高效的读写锁,旨在解决并发编程中读写锁的性能瓶颈。

JDK1.0

JDK 1.0中创建线程的方式主要是继承Thread类或实现Runnable接口,通过对象实例的start方法启动线程,需要并行处理的代码放在run方法中,线程间的协作通信采用简单粗暴的stop/resume/suspend这样的方法。

如何解释stop/resume/suspend的概念呢?就是主线程可以直接调用子线程的终止,暂停,继续方法。如果你小时候用过随身听,上面有三个按键,终止,暂停,继续。想象一下你正在同时听3个随身听,三个随身听就是三个子线程,你就是主线程,你可以随意控制这三个设备的启停。

这一套机制有个致命的问题,就是容易发生死锁,原因在于当线程A锁定了某个资源,还未释放时,被主线程暂停了(suspend方法并不会释放锁),此时线程B如果想占有这个资源,只能等待线程A执行继续操作(resume)后释放资源,否则将永远得不到,发生死锁。

线程的创建方式

Thread类:

public class ThreadA extends Thread{
    @Override
    public void run() {
        System.out.println("ThreadA Run");
    }

    public static void main(String[] args) {
        ThreadA threadA = new ThreadA();
        threadA.start();
    }
}

Runnable接口:

public class ThreadB implements Runnable {

    @Override
    public void run() {
        System.out.println("RunThread");
    }

    public static void main(String[] args) {
        Runnable threadB = new ThreadB();
        new Thread(threadB).start();
    }
}

线程的通信、同步方式

Synchronized

public class InterruptWait extends Thread {
    public static Object lock = new Object();
 
    @Override
    public void run() {
        System.out.println("start");
        synchronized (lock) {
            try {
                lock.wait();
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().isInterrupted());
                Thread.currentThread().interrupt(); // set interrupt flag again
                System.out.println(Thread.currentThread().isInterrupted());
                e.printStackTrace();
            }
        }
    }
 
    public static void main(String[] args) {
        Thread thread = new InterruptWait();
        thread.start();
        try {
            sleep(2000);
        } catch (InterruptedException e) {
        }
        thread.interrupt();
    }
}

Volatile(严格来说它并不能实现同步,这里我们暂时不深究)

public class Atomicity {
 
    private static volatile int nonAtomicCounter = 0;
    private static volatile AtomicInteger atomicCounter = new AtomicInteger(0);
    private static int times = 0;
 
    public static void caculate() {
        times++;
        for (int i = 0; i < 1000; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    nonAtomicCounter++;
                    atomicCounter.incrementAndGet();
                }
            }).start();
        }
 
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
    }
 
    public static void main(String[] args) {
        caculate();
        while (nonAtomicCounter == 1000) {
            nonAtomicCounter = 0;
            atomicCounter.set(0);
            caculate();
        }
 
        System.out.println("Non-atomic counter: " + times + ":"
                + nonAtomicCounter);
        System.out.println("Atomic counter: " + times + ":" + atomicCounter);
    }
}

JDK1.2

粗暴的stop/resume/suspend机制在这个版本被禁止使用了,转而采用wait/notify/sleep这样的多条线程配合行动的方式。值得一提的是,在这个版本中,原子对象AtomicityXXX已经设计好了,主要是解决i++非原子性的问题。ThreadLocal和Collections的加入增加了多线程使用的姿势。

Object.wait\notify

public class Wait extends Thread {
    @Override
    public void run() {
        System.out.println("start");
        synchronized (this) { // wait/notify/notifyAll use the same
                                // synchronization resource
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace(); // notify won't throw exception
            }
        }
    }
 
    public static void main(String[] args) {
        Thread thread = new Wait();
        thread.start();
        try {
            sleep(2000);
        } catch (InterruptedException e) {
        }
        synchronized (thread) {
            System.out.println("Wait() will release the lock!");
            thread.notify();
        }
    }
}

ThreadLocal

ThreadLocal是一种采用无锁的方式实现多线程共享线程不安全对象的方案。它并不能解决“银行账户或库存增加、扣减”这类问题,它擅长将具有“工具”属性的类,通过复本的方式安全的执行“工具”方法。典型的如SimpleDateFormat、库连接等。值得一提的是它的设计非常巧妙,想像一下如果让你设计,一般的简单思路是:在ThreadLocal里维护一个全局线程安全的Map,key为线程,value为共享对象。这样设计有个弊端就是内存泄露问题,因为该Map会随着越来越多的线程加入而无限膨胀,如果要解决内容泄露,必须在线程结束时清理该Map,这又得强化GC能力了,显然投入产出比不合适。于是,ThreadLocal就被设计成Map不由ThreadLocal持有,而是由Thread本身持有。key为ThreadLocal变量,value为值。每个Thread将所用到的ThreadLoacl都放于其中。

public class ThreadLocalUsage extends Thread {
    public User user = new User();
 
    public User getUser() {
        return user;
    }
 
    @Override
    public void run() {
        this.user.set("var1");
 
        while (true) {
            try {
                sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println(this.user.get());
        }
    }
 
    public static void main(String[] args) {
 
        ThreadLocalUsage thread = new ThreadLocalUsage();
        thread.start();
 
        try {
            sleep(4000);
        } catch (InterruptedException e) {
        }
 
        thread.user.set("var2");
 
    }
}
class User {
 
    private static ThreadLocal<Object> enclosure = new ThreadLocal<Object>(); // is it must be static?
 
    public void set(Object object) {
        enclosure.set(object);
    }
 
    public Object get() {
        return enclosure.get();
    }
}

Collections

Collections工具类在这个版本被设计出来了,它包装了一些线程安全集合如SynchronizedList。在那个只有Hashtable、Vector、Stack等线程安全集合的年代,它的出现也是具有时代意义的。Collections工具的基本思想是我帮你将线程不安全的集合包装成线程安全的,这样你原有代码升级改造不必花很多时间,只需要在集合创建的时候用我提供方法初始化集合即可。

public class Main {

    public static void main(String[] args)  {
        //这块没啥想写的,就lang包里提供的数据结构实现类包了一层Synchronized,源码里非常的清晰
        List<String> strings = Collections.synchronizedList(new ArrayList<>());
        strings.forEach(System.out::println);
    }
}

JDK1.4

虽然 JDK 1.4 并未对多线程进行重大改革,但它引入了NIO,允许异步和非阻塞操作,这对于可扩展的多线程网络应用程序至关重要。

JDK5

JUC

Doug Lea,中文名为道格·利。是美国的一个大学教师,大神级的人物,J.U.C就是出自他之手。JDK1.5之前,我们控制程序并发访问同步代码只能使用synchronized,那个时候synchronized的性能还没优化好,性能并不好,控制线程也只能使用Object的wait和notify方法。这个时候Doug Lea给JCP提交了JSR-166的提案,在提交JSR-166之前,Doug Lea已经使用了类似J.U.C包功能的代码已经三年多了,这些代码就是J.U.C的原型。

J.U.C提供了原子化对象、锁及工具套装、线程池、线程安全容器等几大类工具。研发人员可灵活的使用任意能力搭建自己的产品,进可使用ReentrantLock搭建底层框架,退可直接使用现成的工具或容器进行业务代码编写。站在历史的角度去看,J.U.C在2004年毫无争议可以称为“尖端科技产品”。为Java的推广立下了悍马功劳。Java的自动档时代到来了,就好比自动档的汽车降低司机的门槛一样,J.U.C大大降低了程序员使用多线程的门槛。这是个开创了一个时代的产品。

当然J.U.C同样存在一结瑕疵:

CPU开销大:如果自旋CAS长时间地不成功,则会给CPU带来非常大的开销。

解决方案:在JUC中有些地方就限制了CAS自旋的次数,例如BlockingQueue的SynchronousQueue。

ABA问题:如果一个值原来是A,变成了B,然后又变成了A,在CAS检查时会发现没有改变,但实际它已经改变,这就是ABA问题。大部分情况下ABA问题不会影响程序并发的正确性。

解决方案:每个变量都加上一个版本号,每次改变时加1,即A —> B —> A,变成1A —> 2B —> 3A。Java提供了AtomicStampedReference来解决。AtomicStampedReference通过包装[E,Integer]的元组来对对象标记版本戳(stamp),从而避免ABA问题。

只能保证一个共享变量原子操作:CAS机制所保证的只是一个变量的原子性操作,而不能保证整个代码块的原子性。

解决方案:比如需要保证3个变量共同进行原子性的更新,就不得不使用Synchronized了。还可以考虑使用AtomicReference来包装多个变量,通过这种方式来处理多个共享变量的情况。

JMM

此版本的JDK重新明确了Java内存模型,在这之前,常见的内存模型包括连续一致性内存模型和先行发生模型。 对于连续一致性模型来说,程序执行的顺序和代码上显示的顺序是完全一致的。这对于现代多核,并且指令执行优化的CPU来说,是很难保证的。而且,顺序一致性的保证将JVM对代码的运行期优化严重限制住了。

但是此版本JSR 133规范指定的先行发生(Happens-before)使得执行指令的顺序变得灵活:

在同一个线程里面,按照代码执行的顺序(也就是代码语义的顺序),前一个操作先于后面一个操作发生
对一个monitor对象的解锁操作先于后续对同一个monitor对象的锁操作
对volatile字段的写操作先于后面的对此字段的读操作
对线程的start操作(调用线程对象的start()方法)先于这个线程的其他任何操作
一个线程中所有的操作先于其他任何线程在此线程上调用 join()方法
如果A操作优先于B,B操作优先于C,那么A操作优先于C

而在内存分配上,将每个线程各自的工作内存从主存中独立出来,更是给JVM大量的空间来优化线程内指令的执行。主存中的变量可以被拷贝到线程的工作内存中去单独执行,在执行结束后,结果可以在某个时间刷回主存: 但是,怎样来保证各个线程之间数据的一致性?JLS(Java Language Specification)给的办法就是,默认情况下,不能保证任意时刻的数据一致性,但是通过对synchronized、volatile和final这几个语义被增强的关键字的使用,可以做到数据一致性。

JDK6

作为“共和国长子”synchronized关键字,在5.0版本被ReentrantLock压过了风头。这个版本必须要扳回一局,因此JDK 6.0对锁做了一些优化,比如锁自旋、锁消除、锁合并、轻量级锁、所偏向等。本次优化是对“精细化管理”这个理念的一次诠释。没优化之前被synchronized加锁的对象只有两个状态:无锁,有锁(重量级锁)。优化后锁一共存在4种状态,级别从低到高依次是:无锁、偏向锁、轻量级锁、重量级锁。这几个状态随着竞争的情况逐渐升级,但是不能降级,目的是为了提高获取锁和释放锁的效率。

JDK7

Fork/Join的诞生也是一个比较先进的产品,它的核心竞争力在于,支持递归式的任务拆解,同时将各任务结果进行合并。但它是一个既熟悉又陌生的技术,熟悉在于它被应用到各种地方,比如接下来JDK8.0要讲的CompletableFuture和Stream;陌生在于我们似乎很少在业务研发过程中使用到它。

甚至有人甚至觉得它鸡肋。笔者的观点是,你如果是业务需求相关的研发,它是鸡肋的,因为基本用不到,大批数据量的场景有数仓那套工具,其它场景可以用线程池代替;如果你是中间件框架编写相关的研发,它不鸡肋,兴许会用到。中文互联网上很少有人质疑这项技术,但国外已经有人在讨论,感兴趣的可以直接跳转查阅 Is the Fork-Join framework in Java broken?

JDK8

此版本的发布对于Java来说是划时代的,以至于现在全世界在运行的Java程序里此版本占据了一半以上。但多线程相关的更新不如JDK5.0那么具有颠覆性。此版本除了增加了一些原子对象之外 ,最亮眼的便是以下两项更新。

CompletableFuture

网上关于CompletableFuture相关介绍很多,大多是讲它原理及怎么用。但是笔者始终不明白一个问题:为什么在有那么多线程池工具的情况下,还会有CompletableFuture的出现,它解决了什么痛点?它的核心竞争力到底是什么?相信你如果进行过思考也会提出这个问题,没关系,笔者已经帮你找到了答案。

结论:CompletableFuture的核心竞争力是任务编排。CompletableFuture继承Future接口特性,可以进行并发执行任务等特性这些能力都是有可替代性的。但它的任务编排能力无可替代,它的核心API中包括了构造任务链,合并任务结果等都是为了任务编排而设计的。所以JDK之所以在此版本引入此框架,主要是解决业务开发中越来越痛的任务编排需求。

最后多说一句,CompletableFuture底层使用了Fork/Join框架实现。

Stream

《架构整洁之道》里曾提到有三种编程范式,结构化编程(面向过程编程)、面向对象编程、函数式编程。Stream是函数式编程在Java语言中的一种体现,笔者认为,初级程序员向中级进阶的必经之路就是攻克Stream,初次接触Stream肯定特别不适应,但如果熟悉以后你将打开一个编程方式的新思路。作为研发人员经常混淆三个概念,函数式编程、Stream、Lambda表达式,总以为他们三个说的是一回事。以下是笔者的理解:

  • 函数式编程是一种编程思想,各种编程语言中都有该思想的实践

  • Stream是JDK8.0的一个新特性,也可以理解新造了个概念,目的就是迎合函数式编程这种思想,通过Stream的形式可以在集合类上实现函数式编程

  • Lambda 表达式(lambda expression)是一个匿名函数,通过它可以更简洁高效的表达函数式编程

那么说了这么多,Stream和多线程什么关系?Stream中的相关并行方法底层是使用了Fork/Join框架实现的。《Effective Java》中有一条相关建议“谨慎使用Stream并行”,理由就是因为所有的并行都是在一个通用的Fork/Join池中运行的,一个pipeline运行异常,可能损害其他不相关部分性能。