9、AQS-CountDownLatch(共享锁)

烟雨 5年前 (2021-06-13) 阅读数 461 #Java并发
文章标签 并发Java
前面分析了AQS-ReentrantLock(重入锁),接着分析CountDownLatch(共享锁)
CountDownLatch(共享锁)也是是同步工具类之一,可以指定一个计数值(count),在并发环境下由线程调用countDown方法进行减1操作,当计数值变为0之后,被调用await方法阻塞的线程将会唤醒,实现线程间的同步。
public class CountDownLatchDemo01 {
    private static CountDownLatch lock = new CountDownLatch(3);

    public static void test(){
        try {
            System.out.println(Thread.currentThread().getName() + "开始执行");
            TimeUnit.SECONDS.sleep(2);
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            System.out.println(Thread.currentThread().getName() + "执行完成");
            lock.countDown();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> test(),"A").start();
        new Thread(() -> test(),"B").start();
        new Thread(() -> test(),"C").start();
        lock.await();
        System.out.println("啊哈!");
    }
}

等A、B、C3个线程指向完后,才会打印:啊哈!

image.png

一、CountDownLatch(共享锁)源码分析

1.1、构造方法

public CountDownLatch(int count) {
    // count小于0直接抛异常
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
CountDownLatch(共享)和ReentrantLock(重入锁)一样,内部使用Sync继承AQS。通过构造函数传递计数值给Sync,并且设置了state。

1.2、Sync的实现

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }
	//获取当前State值
    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        // state为0,返回1,表示获取锁成功。
        // state不为0,则返回-1,表示获取锁失败。
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // 自旋
        for (;;) {
            // 获取当前State值
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            //使用CAS修改state的值为nextc
            if (compareAndSetState(c, nextc))
                // nextc == 0,true时,表示没有线程获取锁了,表示当前线程lock.await()方法后的代码可以执行了。
                //false,还有线程需要释放锁,不能执行当前线程lock.await()方法后的代码。
                return nextc == 0;
        }
    }
}

1.3、await()方法

await方法,直接调用了AQS的acquireSharedInterruptibly()方法
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

acquireSharedInterruptibly()该方法,尝试获取共享锁,若获取失败,则该线程将会被加入到AQS的同步队列中等待。

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 判断当前线程释放被中断,若被中断,则直接抛出异常结束
    if (Thread.interrupted())
        throw new InterruptedException();
    // 调用tryAcquireShared方法尝试获取锁(修改share的值),调用的是CountDownLatch已经重写过AQS的tryAcquireShared()方法
    if (tryAcquireShared(arg) < 0)
        // 如果线程获取锁失败,将会加入到AQS的同步队列中阻塞等待。
        doAcquireSharedInterruptibly(arg);
}

doAcquireSharedInterruptibly()方法将当前线程构造的node节点列队中,并通过自旋方式尝试获取锁。

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    // 创建共享的Node
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        //自旋
        for (;;) {
            // 获取prev(前一个)节点,若为null即刻抛出NullPointException
            final Node p = node.predecessor();
            // 如果P==head(头结点)
            if (p == head) {
                // 调用tryAcquireShared方法获取share的值,若r<0,自旋继续
                int r = tryAcquireShared(arg);
                // r>=0,说明获取到锁。
                if (r >= 0) {
                    //处理后续节点,将唤醒传递下去
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    //跳出for循环,停止自旋
                    return;
                }
            }
            // 如果获取锁失败,则根据节点的waitStatus决定是否需要挂起线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 若前面为true,则执行挂起,待下次唤醒的时候检测中断的标志。
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            // 如果抛出异常/获取锁失败,进行出队(sync queue)操作
            cancelAcquire(node);
    }
}

/**
* AQS中的方法,判断一个争用锁的线程是否应该被阻塞
* @param pred 前继节点
* @param node 当前节点
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取waitStatus
    int ws = pred.waitStatus;
    // CANCELLED(1):取消状态
	// SIGNA(-1):等待触发状态,前节点可能是head/状态为CANCELLED(取消状态)
	// CONDITION(-2):等待条件状态,在等待队列中
	// PROPAGATE(3):状态需要向后传播
    
    // 如果是SIGNAL状态,意味着当前线程需要被唤醒(unpark)
    if (ws == Node.SIGNAL)
        return true;
    //前继节点状态为CANCELLED(取消状态)/ PROPAGATE(状态需要向后传播)
    if (ws > 0) {
        do {
            // 则设置当前节点的prev(前一个节点)指向原先前继节点的prev(前一个节点)。
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //  如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL(等待触发)状态。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
/**
* 如果shouldParkAfterFailedAcquire()方法返回了true,则会执行:parkAndCheckInterrupt()方法。
* AQS中的方法,parkAndCheckInterrupt()是通过LockSupport.park(this)将当前线程挂起到WATING状态,它需要等待一个中断、unpark方法来唤醒它,通过这样一种FIFO的机制的等待,来实现了Lock的操作。
*/
private final boolean parkAndCheckInterrupt() {
    // 将当前线程挂起到WATING状态
    LockSupport.park(this);
    return Thread.interrupted();
}

setHeadAndPropagate()方法用于传递状态给后续节点(进入这个方法说明前面的节点已经获得锁,并且执行完毕)。需要设置列队的头节点。

private void setHeadAndPropagate(Node node, int propagate) {
    	// 获取head(头节点)
        Node h = head; 
    	// 设置head(头节点)
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 获取当前节点的next(下一个)节点
            Node s = node.next;
            // 如果下一个节点为null/或者为shared共享节点,则释放锁。
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

1.4、countDown()方法

countDown()方法用于使计数器减一也就是将由AQS维护的同步状态值state值减1,其一般是执行任务的线程调用。调用countDown()释放同步状态,每次调用同步状态值-1

public void countDown() {
    sync.releaseShared(1);
}

releaseShared()方法尝试释放锁,该方法是CountDownLatch中Sync内部类实现AQS的tryReleaseShared方法

public final boolean releaseShared(int arg) {
    // 尝试释放锁,调用的是CountDownLatch中Sync内部类实现AQS的tryReleaseShared方法
    if (tryReleaseShared(arg)) {
        // 释放成功,唤醒后续等待的线程
        doReleaseShared();
        return true;
    }
    return false;
}
doReleaseShared()方法唤醒后续线程。流程如下:
  1. 获取head(头节点),判断head(头节点)不为空,且不为tail(尾节点),说明等待队列中有等待唤醒的线程。在等待队列中,head(头节点)中并没有保存正在等待的线程,其只是一个空的Node节点,真正等待的线程是从头节点的下一个节点开始排队等待的。

  2. 在判断等待队列中有正在等待的线程之后,将head(头节点)的状态信息置为初始化状态,并且调用unparkSuccessor(Node)方法唤醒后继节点,使后续节点可以尝试去获取共享锁。

  3. 如果head(头节点)的waitStatus为0此时为初始状态,则将head(头节点)的waitStatus设置为PROPAGATE(-3),表示下一次同步状态的获取将会无条件的传播下去。

  4. 头节点没有被其他线程修改,则跳出循环。

private void doReleaseShared() {
        for (;;) {
            // 获取head(头节点)
            Node h = head;
            // head(头节点)不为null,并且不是尾节点
            if (h != null && h != tail) {
                // 获取status
                int ws = h.waitStatus;
                // CANCELLED(1):取消状态
                // SIGNA(-1):等待触发状态,前节点可能是head/状态为CANCELLED(取消状态)
                // CONDITION(-2):等待条件状态,在等待队列中
                // PROPAGATE(-3):状态需要向后传播

                // 如果是SIGNAL状态,意味着当前线程需要被唤醒(unpark)
                if (ws == Node.SIGNAL) {
                    // 利用CAS修改waitStatus值为0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //唤醒后续节点
                    unparkSuccessor(h);
                }
                // 如果status==0,则利用CAS修改waitStatus为PROPAGATE(-3)
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 如果h == head(头节点),说明没有其他线程对head(头节点)进行修改,跳出自旋。
            if (h == head)                   // loop if head changed
                break;
        }
    }

参考:https://www.jianshu.com/p/ddcc8aea4030

版权声明

非特殊说明,本文由Zender原创或收集发布,欢迎转载。

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

作者文章
热门