Java多线程之---用 CountDownLatch 说明 AQS 的实现原理

本文基于 jdk 1.8 。

CountDownLatch 的使用

前面的文章中说到了 volatile 以及用 volatile 来实现自旋锁,例如 java.util.concurrent.atomic 包下的工具类。但是 volatile 的使用场景毕竟有限,很多的情况下并不是适用,这个时候就需要 synchronized 或者各种锁实现了。今天就来说一下几种锁的实现原理。

先来看一个最简单的 CountDownLatch 使用方法,例子很简单,可以运行看一下效果。CountDownLatch 的作用是:当一个线程需要另外一个或多个线程完成后,再开始执行。比如主线程要等待一个子线程完成环境相关配置的加载工作,主线程才继续执行,就可以利用 CountDownLatch 来实现。

例如下面这个例子,首先实例化一个 CountDownLatch ,参数可以理解为一个计数器,这里为 1,然后主线程执行,调用 worker 子线程,接着调用 CountDownLatch 的 await() 方法,表示阻塞主线程。当子线程执行完成后,在 finnaly 块调用 countDown() 方法,表示一个等待已经完成,把计数器减一,直到减为 0,主线程又开始执行。

private static CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) throws InterruptedException{
        System.out.println("主线程开始......");
        Thread thread = new Thread(new Worker());
        thread.start();
        System.out.println("主线程等待......");
        System.out.println(latch.toString());
        latch.await();
        System.out.println(latch.toString());
        System.out.println("主线程继续.......");
    }

    public static class Worker implements Runnable {

        @Override
        public void run() {
            System.out.println("子线程任务正在执行");
            try {
                Thread.sleep(2000);
            }catch (InterruptedException e){

            }finally {
                latch.countDown();
            }
        }
    }

执行结果如下:

主线程开始......
子线程任务正在执行
主线程等待......
java.util.concurrent.CountDownLatch@1d44bcfa[Count = 1]
java.util.concurrent.CountDownLatch@1d44bcfa[Count = 0]
主线程继续.......

AQS 的原理

这么好用的功能是怎么实现的呢,下面就来说一说实现它的核心技术原理 AQS。 AQS 全称 AbstractQueuedSynchronizer ,是 java.util.concurrent 中提供的一种高效且可扩展的同步机制。它可以用来实现可以依赖 int 状态的同步器,获取和释放参数以及一个内部FIFO等待队列,除了 CountDownLatchReentrantLockSemaphore 等功能实现都使用了它。

接下来用 CountDownLatch 来分析一下 AQS 的实现。建议看文章的时候先大致看一下源码,有助于理解下面所说的内容。

在我们的方法中调用 awit()countDown() 的时候,发生了几个关键的调用关系,我画了一个方法调用图。

图片描述

首先在 CountDownLatch 类内部定义了一个 Sync 内部类,这个内部类就是继承自 AbstractQueuedSynchronizer 的。并且重写了方法 tryAcquireSharedtryReleaseShared 。例如当调用 awit() 方法时,CountDownLatch 会调用内部类Sync 的 acquireSharedInterruptibly() 方法,然后在这个方法中会调用 tryAcquireShared 方法,这个方法就是 CountDownLatch 的内部类 Sync 里重写的 AbstractQueuedSynchronizer 的方法。调用 countDown() 方法同理。

这种方式是使用 AbstractQueuedSynchronizer 的标准化方式,大致分为两步:

1、内部持有继承自 AbstractQueuedSynchronizer 的对象 Sync;

2、并在 Sync 内重写 AbstractQueuedSynchronizer protected 的部分或全部方法,这些方法包括如下几个:

图片描述

之所以要求子类重写这些方法,是为了让使用者(这里的使用者指 CountDownLatch 等)可以在其中加入自己的判断逻辑,例如 CountDownLatch 在 tryAcquireShared 中加入了判断,判断 state 是否不为0,如果不为0,才符合调用条件。

tryAcquiretryRelease 是对应的,前者是独占模式获取,后者是独占模式释放。

tryAcquireSharedtryReleaseShared 是对应的,前者是共享模式获取,后者是共享模式释放。

我们看到 CountDownLatch 重写的方法 tryAcquireShared 实现如下:

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

判断 state 值是否为0,为0 返回1,否则返回 -1。state 值是 AbstractQueuedSynchronizer 类中的一个 volatile 变量。

private volatile int state;

在 CountDownLatch 中这个 state 值就是计数器,在调用 await 方法的时候,将值赋给 state 。

等待线程入队

根据上面的逻辑,调用 await() 方法时,先去获取 state 的值,当计数器不为0的时候,说明还有需要等待的线程在运行,则调用 doAcquireSharedInterruptibly 方法,进来执行的第一个动作就是尝试加入等待队列 ,即调用 addWaiter()方法, 源码如下:

到这里就走到了 AQS 的核心部分,AQS 用内部的一个 Node 类维护一个 CHL Node FIFO 队列。将当前线程加入等待队列,并通过 parkAndCheckInterrupt()方法实现当前线程的阻塞。下面一大部分都是在说明 CHL 队列的实现,里面用 CAS 实现队列出入不会发生阻塞。

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //加入等待队列                      
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        // 进入 CAS 循环
        try {
            for (;;) {
                //当一个节点(关联一个线程)进入等待队列后, 获取此节点的 prev 节点 
                final Node p = node.predecessor();
                // 如果获取到的 prev 是 head,也就是队列中第一个等待线程
                if (p == head) {
                    // 再次尝试申请 反应到 CountDownLatch 就是查看是否还有线程需要等待(state是否为0)
                    int r = tryAcquireShared(arg);
                    // 如果 r >=0 说明 没有线程需要等待了 state==0
                    if (r >= 0) {
                        //尝试将第一个线程关联的节点设置为 head 
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //经过自旋tryAcquireShared后,state还不为0,就会到这里,第一次的时候,waitStatus是0,那么node的waitStatus就会被置为SIGNAL,第二次再走到这里,就会用LockSupport的park方法把当前线程阻塞住
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

我看看到上面先执行了 addWaiter() 方法,就是将当前线程加入等待队列,源码如下:

/** Marker to indicate a node is waiting in shared mode */
 static final Node SHARED = new Node();
 /** Marker to indicate a node is waiting in exclusive mode */
 static final Node EXCLUSIVE = null;

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 尝试快速入队操作,因为大多数时候尾节点不为 null
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //如果尾节点为空(也就是队列为空) 或者尝试CAS入队失败(由于并发原因),进入enq方法
        enq(node);
        return node;
    }

上面是向等待队列中添加等待者(waiter)的方法。首先构造一个 Node 实体,参数为当前线程和一个mode,这个mode有两种形式,一个是 SHARED ,一个是 EXCLUSIVE,请看上面的代码。然后执行下面的入队操作 addWaiter,和 enq() 方法的 else 分支操作是一样的,这里的操作如果成功了,就不用再进到 enq() 方法的循环中去了,可以提高性能。如果没有成功,再调用 enq() 方法。

private Node enq(final Node node) {
        // 死循环+CAS保证所有节点都入队
        for (;;) {
            Node t = tail;
            // 如果队列为空 设置一个空节点作为 head
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //加入队尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

说明:循环加 CAS 操作是实现乐观锁的标准方式,CAS 是为了实现原子操作而出现的,所谓的原子操作指操作执行期间,不会受其他线程的干扰。Java 实现的 CAS 是调用 unsafe 类提供的方法,底层是调用 c++ 方法,直接操作内存,在 cpu 层面加锁,直接对内存进行操作。

上面是 AQS 等待队列入队方法,操作在无限循环中进行,如果入队成功则返回新的队尾节点,否则一直自旋,直到入队成功。假设入队的节点为 node ,上来直接进入循环,在循环中,先拿到尾节点。

1、if 分支,如果尾节点为 null,说明现在队列中还没有等待线程,则尝试 CAS 操作将头节点初始化,然后将尾节点也设置为头节点,因为初始化的时候头尾是同一个,这和 AQS 的设计实现有关, AQS 默认要有一个虚拟节点。此时,尾节点不在为空,循环继续,进入 else 分支;

2、else 分支,如果尾节点不为 null, node.prev = t ,也就是将当前尾节点设置为待入队节点的前置节点。然后又是利用 CAS 操作,将待入队的节点设置为队列的尾节点,如果 CAS 返回 false,表示未设置成功,继续循环设置,直到设置成功,接着将之前的尾节点(也就是倒数第二个节点)的 next 属性设置为当前尾节点,对应 t.next = node 语句,然后返回当前尾节点,退出循环。

setHeadAndPropagate 方法负责将自旋等待或被 LockSupport 阻塞的线程唤醒。

private void setHeadAndPropagate(Node node, int propagate) {
        //备份现在的 head
        Node h = head;  
        //抢到锁的线程被唤醒 将这个节点设置为head
        setHead(node)
        // propagate 一般都会大于0 或者存在可被唤醒的线程
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            // 只有一个节点 或者是共享模式 释放所有等待线程 各自尝试抢占锁
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

Node 对象中有一个属性是 waitStatus ,它有四种状态,分别是:

//线程已被 cancelled ,这种状态的节点将会被忽略,并移出队列
static final int CANCELLED =  1;
// 表示当前线程已被挂起,并且后继节点可以尝试抢占锁
static final int SIGNAL    = -1;
//线程正在等待某些条件
static final int CONDITION = -2;
//共享模式下 无条件所有等待线程尝试抢占锁
static final int PROPAGATE = -3;

等待线程被唤醒

当执行 CountDownLatch 的 countDown()方法,将计数器减一,也就是state减一,当减到0的时候,等待队列中的线程被释放。是调用 AQS 的 releaseShared 方法来实现的,下面代码中的方法是按顺序调用的,摘到了一起,方便查看:

// AQS类
public final boolean releaseShared(int arg) {
        // arg 为固定值 1
        // 如果计数器state 为0 返回true,前提是调用 countDown() 之前不能已经为0
        if (tryReleaseShared(arg)) {
            // 唤醒等待队列的线程
            doReleaseShared();
            return true;
        }
        return false;
    }

// CountDownLatch 重写的方法
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            // 依然是循环+CAS配合 实现计数器减1
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

/// AQS类
 private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null &amp;&amp; h != tail) {
                int ws = h.waitStatus;
                // 如果节点状态为SIGNAL,则他的next节点也可以尝试被唤醒
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                // 将节点状态设置为PROPAGATE,表示要向下传播,依次唤醒
                else if (ws == 0 &amp;&amp;
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

因为这是共享型的,当计数器为 0 后,会唤醒等待队列里的所有线程,所有调用了 await() 方法的线程都被唤醒,并发执行。这种情况对应到的场景是,有多个线程需要等待一些动作完成,比如一个线程完成初始化动作,其他5个线程都需要用到初始化的结果,那么在初始化线程调用 countDown 之前,其他5个线程都处在等待状态。一旦初始化线程调用了 countDown ,其他5个线程都被唤醒,开始执行。

总结

1、AQS 分为独占模式和共享模式,CountDownLatch 使用了它的共享模式。

2、AQS 当第一个等待线程(被包装为 Node)要入队的时候,要保证存在一个 head 节点,这个 head 节点不关联线程,也就是一个虚节点。

3、当队列中的等待节点(关联线程的,非 head 节点)抢到锁,将这个节点设置为 head 节点。

4、第一次自旋抢锁失败后,waitStatus 会被设置为 -1(SIGNAL),第二次再失败,就会被 LockSupport 阻塞挂起。

5、如果一个节点的前置节点为 SIGNAL 状态,则这个节点可以尝试抢占锁。

不妨到我的公众号里互动一下 :古时的风筝

扫码关注