侧边栏壁纸
博主头像
qingtian博主等级

喜欢是一件细水流长的事,是永不疲惫的双向奔赴~!

  • 累计撰写 94 篇文章
  • 累计创建 42 个标签
  • 累计收到 1 条评论

CountDownLatch源码解析

qingtian
2020-11-09 / 0 评论 / 0 点赞 / 1,184 阅读 / 11,290 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2020-11-09,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

CountDownLatch源码解析

在上一篇转载的文章从 ReentrantLock 的实现看 AQS 的原理及应用 - 美团技术团队中我们了解到AQS与ReentrantLock的联系,其实不仅仅是ReentrantLock,JUC下的很多组件都与AQS有着或多或少的联系.

下面我来介绍一下CountDownLatch,权当狗尾续貂.

CountDownLatch介绍

CountDownLatch允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。在 Java 并发中,countdownlatch 的概念是一个常见的面试题,所以一定要确保你很好的理解了它。

CountDownLatch是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用countDown方法时,其实使用了tryReleaseShared方法以CAS的操作来减少state,直至state为0就代表所有的线程都调用了countDown方法。当调用await方法的时候,如果state不为0,就代表仍然有线程没有调用countDown方法,那么就把已经调用过countDown的线程都放入阻塞队列Park,并自旋CAS判断state == 0,直至最后一个线程调用了countDown,使得state == 0,于是阻塞的线程便判断成功,全部往下执行。

用法

  1. 某一线程在开始运行前等待 n 个线程执行完毕。将 CountDownLatch 的计数器初始化为 n :new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减 1 countdownlatch.countDown(),当计数器的值变为 0 时,在CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
  2. 实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1 :new CountDownLatch(1),多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为 0,多个线程同时被唤醒。

示例

public class CountDownLatchExample1 {
  // 请求的数量
  private static final int threadCount = 550;

  public static void main(String[] args) throws InterruptedException {
    // 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
    ExecutorService threadPool = Executors.newFixedThreadPool(300);
    final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    for (int i = 0; i < threadCount; i++) {
      final int threadnum = i;
      threadPool.execute(() -> {// Lambda 表达式的运用
        try {
          test(threadnum);
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        } finally {
          countDownLatch.countDown();// 表示一个请求已经被完成
        }

      });
    }
    countDownLatch.await();
    threadPool.shutdown();
    System.out.println("finish");
  }

  public static void test(int threadnum) throws InterruptedException {
    Thread.sleep(1000);// 模拟请求的耗时操作
    System.out.println("threadnum:" + threadnum);
    Thread.sleep(1000);// 模拟请求的耗时操作
  }
}

上面的代码中,我们定义了请求的数量为 550,当这 550 个请求被处理完成之后,才会执行System.out.println("finish");

与 CountDownLatch 的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用 CountDownLatch.await() 方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。

其他 N 个线程必须引用闭锁对象,因为他们需要通知 CountDownLatch 对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的 count 值就减 1。所以当 N 个线程都调 用了这个方法,count 的值等于 0,然后主线程就能通过 await()方法,恢复执行自己的任务。


上面介绍了CountDownLatch 的基本概念和一些用法,下面分析CountDownLatch的源码

CountDownLatch源码

经过上一篇文章的介绍相信大家已经知道AQS有两种锁模式,分别是独占模式和共享模式,从 ReentrantLock 的实现看 AQS 的原理及应用 - 美团技术团队.而CountDownLatch就是利用的AQS的共享模式实现其功能.

下面是共享模式的流程示意图:

共享模式

其中state变量记录剩余的共享次数,每次尝试获取锁并成功时,将state变量自减1,用以记录锁的同步状态.

源码

public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    //Sync为自定义同步器,继承于AbstractQueuedSynchronizer
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        //根据传入的count值量设置state变量,同步次数
        Sync(int count) {
            setState(count);
        }

        //得到state状态
        int getCount() {
            return getState();
        }

        //自实现tryAcquireShared方法,获取锁方法
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        //自实现tryReleaseShared,释放锁方法
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    .......
    .......
        
}

构造函数

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

初始化CountDownLatch,通过传入的count值创建一个自实现的同步器,此同步器是继承AQS的.

CountDownLatch的基本方法

public void await() throws InterruptedException {
	sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

可以看到CountDownLatch.await()方法内部调用了sync.acquireSharedInterruptibly(1)

进入此方法.....

到达AbstractQueuedSynchronizer......

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

在acquireSharedInterruptibly方法中,判断tryAcquireShared(arg) < 0,如果为true则执行doAcquireSharedInterruptibly(arg)方法

其中tryAcquireShared()方法为CountDownLatch内部同步器的自实现方法,我们回头看

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

在此方法中判断CountDownLatchstate变量是否 == 0;如果等于0返回值为1,否则返回值为-1.

如果此时state变量大于0,则返回值为-1

进入doAcquireSharedInterruptibly(arg)方法

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    	//添加一个等待节点在AQS的队尾
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            //进入一个无限循环
            for (;;) {
                //得到node节点的前驱节点
                final Node p = node.predecessor();
                //如果p为队首节点
                if (p == head) {
                    //尝试获取锁,此时得到的state值是否>0,意味着锁是否还有剩余的共享次数
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //如果锁剩余的共享次数大于0,node节点可以成功获取锁
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //与AQS中一样,判断在锁竞争的过程中
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

有没有觉得这个方法很眼熟!

与前一篇文章中,介绍AQS中获取锁的方法类似,

判断在获取锁的过程中,在竞争失败后是否需要将此线程挂起.

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

以下是流程图

9af16e2481ad85f38ca322a225ae737535740

总结: 在初始化CountDownLatch时根据传入的count值初始化一个同步器,count值是其中的state值,在每次尝试获取锁时,先判断state值是否 == 0,

  1. 如果等于0,此时锁的剩余数量为0,无法获取共享锁
  2. 否则返回-1,执行doAcquireSharedInterruptibly(arg)方法,并开始尝试获取锁
  3. 在不断尝试获取锁的过程中,判断是否将当前线程挂起

下面看到countDown()方法

public void countDown() {
    sync.releaseShared(1);
}

方法中调用自定义同步器的releaseShared()方法

此时回头看自定义同步器的releaseShared()方法,进入AQS的releaseShared()方法

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

看其中CountDownLatch自实现的tryReleaseShared()方法

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

先进入一个无限循环,每调用一次tryReleaseShared()方法就将其中的state变量的值进行自减,直到将state值减为零时返回true .

回头看releaseShared()方法

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            //返回true
            doReleaseShared();
            return true;
        }
        return false;
    }

调用doReleaseShared()方法

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        //得到AQS队列的头节点
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //得到node节点的waitStatus
            if (ws == Node.SIGNAL) {
                //通过CAS操作将当前node节点的ws值修改为0,0为一个node节点刚初始化的状态
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                //唤醒该节点的后继节点,使它进入锁的竞争中
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

总结:

  1. 每次调用countDown方法时,将调用AQS的releaseShared()方法,
  2. 在releaseShared()方法中调用countDownLatch自定义同步器中的tryReleaseShared(),将state变量值不断进行自减,直到state值变为0
  3. 调用doReleaseShared()方法,唤醒后继节点.

最后

通过本篇文章我们知道了CountDownLatch的基本用法;并且还知道了CountDownLatch的功能实现主要是依靠AQS同步器,并在此基础上自实现了tryAcquireShared()方法和tryReleaseShared()方法

  1. 关于获取锁的方法:

    在state变量自减为零时,等待的线程开始竞争锁

  2. 关于释放锁的方法

    在调用countDown方法时,每次将state变量的值进行自减,直到state的值减为0时,唤醒等待的后续节点.

0

评论区