CountDownLatch源码解析
在上一篇转载的文章从 ReentrantLock 的实现看 AQS 的原理及应用 - 美团技术团队中我们了解到AQS与ReentrantLock的联系,其实不仅仅是ReentrantLock,JUC下的很多组件都与AQS有着或多或少的联系.
下面我来介绍一下CountDownLatch,权当狗尾续貂.
CountDownLatch介绍
CountDownLatch允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。在 Java 并发中,countdownlatch 的概念是一个常见的面试题,所以一定要确保你很好的理解了它。
CountDownLatch是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用countDown方法时,其实使用了tryReleaseShared
方法以CAS的操作来减少state,直至state为0就代表所有的线程都调用了countDown方法。当调用await方法的时候,如果state不为0,就代表仍然有线程没有调用countDown方法,那么就把已经调用过countDown的线程都放入阻塞队列Park,并自旋CAS判断state == 0,直至最后一个线程调用了countDown,使得state == 0,于是阻塞的线程便判断成功,全部往下执行。
用法
- 某一线程在开始运行前等待 n 个线程执行完毕。将 CountDownLatch 的计数器初始化为 n :
new CountDownLatch(n)
,每当一个任务线程执行完毕,就将计数器减 1countdownlatch.countDown()
,当计数器的值变为 0 时,在CountDownLatch上 await()
的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。 - 实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的
CountDownLatch
对象,将其计数器初始化为 1 :new CountDownLatch(1)
,多个线程在开始执行任务前首先coundownlatch.await()
,当主线程调用 countDown() 时,计数器变为 0,多个线程同时被唤醒。
示例
public class CountDownLatchExample1 {
// 请求的数量
private static final int threadCount = 550;
public static void main(String[] args) throws InterruptedException {
// 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
ExecutorService threadPool = Executors.newFixedThreadPool(300);
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {// Lambda 表达式的运用
try {
test(threadnum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
countDownLatch.countDown();// 表示一个请求已经被完成
}
});
}
countDownLatch.await();
threadPool.shutdown();
System.out.println("finish");
}
public static void test(int threadnum) throws InterruptedException {
Thread.sleep(1000);// 模拟请求的耗时操作
System.out.println("threadnum:" + threadnum);
Thread.sleep(1000);// 模拟请求的耗时操作
}
}
上面的代码中,我们定义了请求的数量为 550,当这 550 个请求被处理完成之后,才会执行System.out.println("finish");
。
与 CountDownLatch 的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用 CountDownLatch.await()
方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
其他 N 个线程必须引用闭锁对象,因为他们需要通知 CountDownLatch
对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()
方法来完成的;每调用一次这个方法,在构造函数中初始化的 count 值就减 1。所以当 N 个线程都调 用了这个方法,count 的值等于 0,然后主线程就能通过 await()
方法,恢复执行自己的任务。
上面介绍了CountDownLatch 的基本概念和一些用法,下面分析CountDownLatch的源码
CountDownLatch源码
经过上一篇文章的介绍相信大家已经知道AQS有两种锁模式,分别是独占模式和共享模式,从 ReentrantLock 的实现看 AQS 的原理及应用 - 美团技术团队.而CountDownLatch就是利用的AQS的共享模式实现其功能.
下面是共享模式的流程示意图:
其中state变量记录剩余的共享次数,每次尝试获取锁并成功时,将state变量自减1,用以记录锁的同步状态.
源码
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
//Sync为自定义同步器,继承于AbstractQueuedSynchronizer
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
//根据传入的count值量设置state变量,同步次数
Sync(int count) {
setState(count);
}
//得到state状态
int getCount() {
return getState();
}
//自实现tryAcquireShared方法,获取锁方法
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//自实现tryReleaseShared,释放锁方法
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
.......
.......
}
构造函数
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
初始化CountDownLatch
,通过传入的count值创建一个自实现的同步器,此同步器是继承AQS的.
CountDownLatch的基本方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
可以看到CountDownLatch.await()
方法内部调用了sync.acquireSharedInterruptibly(1)
进入此方法.....
到达AbstractQueuedSynchronizer......
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
在acquireSharedInterruptibly方法中,判断tryAcquireShared(arg) < 0
,如果为true
则执行doAcquireSharedInterruptibly(arg)
方法
其中tryAcquireShared()方法为CountDownLatch内部同步器的自实现方法,我们回头看
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
在此方法中判断CountDownLatch
的state
变量是否 == 0;如果等于0返回值为1,否则返回值为-1.
如果此时state
变量大于0,则返回值为-1
进入doAcquireSharedInterruptibly(arg)
方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//添加一个等待节点在AQS的队尾
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
//进入一个无限循环
for (;;) {
//得到node节点的前驱节点
final Node p = node.predecessor();
//如果p为队首节点
if (p == head) {
//尝试获取锁,此时得到的state值是否>0,意味着锁是否还有剩余的共享次数
int r = tryAcquireShared(arg);
if (r >= 0) {
//如果锁剩余的共享次数大于0,node节点可以成功获取锁
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//与AQS中一样,判断在锁竞争的过程中
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
有没有觉得这个方法很眼熟!
与前一篇文章中,介绍AQS中获取锁的方法类似,
判断在获取锁的过程中,在竞争失败后是否需要将此线程挂起.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
以下是流程图
总结: 在初始化
CountDownLatch
时根据传入的count值初始化一个同步器,count值是其中的state值,在每次尝试获取锁时,先判断state值是否 == 0,
- 如果等于0,此时锁的剩余数量为0,无法获取共享锁
- 否则返回-1,执行doAcquireSharedInterruptibly(arg)方法,并开始尝试获取锁
- 在不断尝试获取锁的过程中,判断是否将当前线程挂起
下面看到countDown()方法
public void countDown() {
sync.releaseShared(1);
}
方法中调用自定义同步器的releaseShared()方法
此时回头看自定义同步器的releaseShared()方法,进入AQS的releaseShared()方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
看其中CountDownLatch自实现的tryReleaseShared()方法
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
先进入一个无限循环,每调用一次tryReleaseShared()方法就将其中的state变量的值进行自减,直到将state值减为零时返回true .
回头看releaseShared()方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
//返回true
doReleaseShared();
return true;
}
return false;
}
调用doReleaseShared()方法
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
//得到AQS队列的头节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//得到node节点的waitStatus
if (ws == Node.SIGNAL) {
//通过CAS操作将当前node节点的ws值修改为0,0为一个node节点刚初始化的状态
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒该节点的后继节点,使它进入锁的竞争中
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
总结:
- 每次调用countDown方法时,将调用AQS的releaseShared()方法,
- 在releaseShared()方法中调用countDownLatch自定义同步器中的tryReleaseShared(),将state变量值不断进行自减,直到state值变为0
- 调用doReleaseShared()方法,唤醒后继节点.
最后
通过本篇文章我们知道了CountDownLatch的基本用法;并且还知道了CountDownLatch的功能实现主要是依靠AQS同步器,并在此基础上自实现了tryAcquireShared()方法和tryReleaseShared()方法
-
关于获取锁的方法:
在state变量自减为零时,等待的线程开始竞争锁
-
关于释放锁的方法
在调用countDown方法时,每次将state变量的值进行自减,直到state的值减为0时,唤醒等待的后续节点.
评论区