9、AQS-CountDownLatch(共享锁)
public class CountDownLatchDemo01 {
private static CountDownLatch lock = new CountDownLatch(3);
public static void test(){
try {
System.out.println(Thread.currentThread().getName() + "开始执行");
TimeUnit.SECONDS.sleep(2);
} catch (Exception e){
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "执行完成");
lock.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
new Thread(() -> test(),"A").start();
new Thread(() -> test(),"B").start();
new Thread(() -> test(),"C").start();
lock.await();
System.out.println("啊哈!");
}
}
等A、B、C3个线程指向完后,才会打印:啊哈!
一、CountDownLatch(共享锁)源码分析
1.1、构造方法
public CountDownLatch(int count) {
// count小于0直接抛异常
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
1.2、Sync的实现
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
//获取当前State值
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
// state为0,返回1,表示获取锁成功。
// state不为0,则返回-1,表示获取锁失败。
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// 自旋
for (;;) {
// 获取当前State值
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
//使用CAS修改state的值为nextc
if (compareAndSetState(c, nextc))
// nextc == 0,true时,表示没有线程获取锁了,表示当前线程lock.await()方法后的代码可以执行了。
//false,还有线程需要释放锁,不能执行当前线程lock.await()方法后的代码。
return nextc == 0;
}
}
}
1.3、await()方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly()该方法,尝试获取共享锁,若获取失败,则该线程将会被加入到AQS的同步队列中等待。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 判断当前线程释放被中断,若被中断,则直接抛出异常结束
if (Thread.interrupted())
throw new InterruptedException();
// 调用tryAcquireShared方法尝试获取锁(修改share的值),调用的是CountDownLatch已经重写过AQS的tryAcquireShared()方法
if (tryAcquireShared(arg) < 0)
// 如果线程获取锁失败,将会加入到AQS的同步队列中阻塞等待。
doAcquireSharedInterruptibly(arg);
}
doAcquireSharedInterruptibly()方法将当前线程构造的node节点列队中,并通过自旋方式尝试获取锁。
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 创建共享的Node
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
//自旋
for (;;) {
// 获取prev(前一个)节点,若为null即刻抛出NullPointException
final Node p = node.predecessor();
// 如果P==head(头结点)
if (p == head) {
// 调用tryAcquireShared方法获取share的值,若r<0,自旋继续
int r = tryAcquireShared(arg);
// r>=0,说明获取到锁。
if (r >= 0) {
//处理后续节点,将唤醒传递下去
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
//跳出for循环,停止自旋
return;
}
}
// 如果获取锁失败,则根据节点的waitStatus决定是否需要挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
// 若前面为true,则执行挂起,待下次唤醒的时候检测中断的标志。
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
// 如果抛出异常/获取锁失败,进行出队(sync queue)操作
cancelAcquire(node);
}
}
/**
* AQS中的方法,判断一个争用锁的线程是否应该被阻塞
* @param pred 前继节点
* @param node 当前节点
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取waitStatus
int ws = pred.waitStatus;
// CANCELLED(1):取消状态
// SIGNA(-1):等待触发状态,前节点可能是head/状态为CANCELLED(取消状态)
// CONDITION(-2):等待条件状态,在等待队列中
// PROPAGATE(3):状态需要向后传播
// 如果是SIGNAL状态,意味着当前线程需要被唤醒(unpark)
if (ws == Node.SIGNAL)
return true;
//前继节点状态为CANCELLED(取消状态)/ PROPAGATE(状态需要向后传播)
if (ws > 0) {
do {
// 则设置当前节点的prev(前一个节点)指向原先前继节点的prev(前一个节点)。
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL(等待触发)状态。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* 如果shouldParkAfterFailedAcquire()方法返回了true,则会执行:parkAndCheckInterrupt()方法。
* AQS中的方法,parkAndCheckInterrupt()是通过LockSupport.park(this)将当前线程挂起到WATING状态,它需要等待一个中断、unpark方法来唤醒它,通过这样一种FIFO的机制的等待,来实现了Lock的操作。
*/
private final boolean parkAndCheckInterrupt() {
// 将当前线程挂起到WATING状态
LockSupport.park(this);
return Thread.interrupted();
}
setHeadAndPropagate()方法用于传递状态给后续节点(进入这个方法说明前面的节点已经获得锁,并且执行完毕)。需要设置列队的头节点。
private void setHeadAndPropagate(Node node, int propagate) {
// 获取head(头节点)
Node h = head;
// 设置head(头节点)
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 获取当前节点的next(下一个)节点
Node s = node.next;
// 如果下一个节点为null/或者为shared共享节点,则释放锁。
if (s == null || s.isShared())
doReleaseShared();
}
}
1.4、countDown()方法
countDown()方法用于使计数器减一也就是将由AQS维护的同步状态值state值减1,其一般是执行任务的线程调用。调用countDown()释放同步状态,每次调用同步状态值-1
public void countDown() {
sync.releaseShared(1);
}
releaseShared()方法尝试释放锁,该方法是CountDownLatch中Sync内部类实现AQS的tryReleaseShared方法
public final boolean releaseShared(int arg) {
// 尝试释放锁,调用的是CountDownLatch中Sync内部类实现AQS的tryReleaseShared方法
if (tryReleaseShared(arg)) {
// 释放成功,唤醒后续等待的线程
doReleaseShared();
return true;
}
return false;
}
-
获取head(头节点),判断head(头节点)不为空,且不为tail(尾节点),说明等待队列中有等待唤醒的线程。在等待队列中,head(头节点)中并没有保存正在等待的线程,其只是一个空的Node节点,真正等待的线程是从头节点的下一个节点开始排队等待的。
-
在判断等待队列中有正在等待的线程之后,将head(头节点)的状态信息置为初始化状态,并且调用unparkSuccessor(Node)方法唤醒后继节点,使后续节点可以尝试去获取共享锁。
-
如果head(头节点)的waitStatus为0此时为初始状态,则将head(头节点)的waitStatus设置为PROPAGATE(-3),表示下一次同步状态的获取将会无条件的传播下去。
-
头节点没有被其他线程修改,则跳出循环。
private void doReleaseShared() {
for (;;) {
// 获取head(头节点)
Node h = head;
// head(头节点)不为null,并且不是尾节点
if (h != null && h != tail) {
// 获取status
int ws = h.waitStatus;
// CANCELLED(1):取消状态
// SIGNA(-1):等待触发状态,前节点可能是head/状态为CANCELLED(取消状态)
// CONDITION(-2):等待条件状态,在等待队列中
// PROPAGATE(-3):状态需要向后传播
// 如果是SIGNAL状态,意味着当前线程需要被唤醒(unpark)
if (ws == Node.SIGNAL) {
// 利用CAS修改waitStatus值为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒后续节点
unparkSuccessor(h);
}
// 如果status==0,则利用CAS修改waitStatus为PROPAGATE(-3)
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果h == head(头节点),说明没有其他线程对head(头节点)进行修改,跳出自旋。
if (h == head) // loop if head changed
break;
}
}
版权声明
非特殊说明,本文由Zender原创或收集发布,欢迎转载。
ZENDER

发表评论:
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。