8、AQS-ReentrantLock

烟雨 5年前 (2021-06-13) 阅读数 384 #Java并发
文章标签 并发Java
Java除了使用关键字synchronized加锁外,还可以使用ReentrantLock实现独占锁的功能。而且ReentrantLock相比synchronized而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景。
ReentrantLock与synchronized区别:
  1. synchronized是独占锁,加锁、解锁过程自动进行(不能人工干预,不够灵活)。ReentrantLock也是独占锁,加锁和解锁的过程手动进行(非常灵活)。

  2. synchronized可重入,因为加锁和解锁自动进行,不必担心最后是否释放锁。ReentrantLock也可重入,但加锁和解锁需要手动进行,且次数需一样,否则其他线程无法获得锁。

  3. synchronized不可响应中断,一个线程获取不到锁就一直等着。ReentrantLock可以相应中断。

一、ReentrantLock类图

image.png

二、ReentrantLock时序图

image.png

从图上可以看出来,当锁获取失败时,会调用addWaiter()方法将当前线程封装成Node节点加入到AQS队列。

三、ReentrantLock(重入锁)源码分析

3.1、构造方法

/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
    sync = new NonfairSync();
}

/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock()默认构造方法,创建的是NonfairSync类,代表非公平锁。
ReentrantLock(boolean fair)构造方法,根据fair值创建公平锁(FairSync)、非公平锁(NonfairSync)

3.2、抽象静态内部类Sync

Sync继承了AQS,实现了AQS需要实现的方法。
abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();

        /**
		 * 尝试获取锁,如果成功返回true,不成功返回false
         * Sync重写了AQS类中的nonfairTryAcquire方法
         */
        final boolean nonfairTryAcquire(int acquires) {
            // 获得当前执行的线程
            final Thread current = Thread.currentThread();
            // 获取state的值
            int c = getState();
            // state==0 表示无锁状态
            if (c == 0) {
                // 通过CAS操作来修改state状态,初始值0,修改值acquires
                if (compareAndSetState(0, acquires)) {
                    // 保存当前获得锁的线程
                    setExclusiveOwnerThread(current);
                    // 成功获取锁
                    return true;
                }
            }
            // 如果是同一个线程来获得锁,则直接增加重入次数
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        protected final boolean tryRelease(int releases) {
            // state值-1
            int c = getState() - releases;
            // 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // state=0说明没有现在占该用锁,一旦释放锁,其他线程将能获取锁。
            if (c == 0) {
                free = true;
                // 清除锁的持有线程标记
                setExclusiveOwnerThread(null);
            }
            // 更新state值
            setState(c);
            return free;
        }
    
		// 判断当前线程是否正在独占资源,独占则返回true
        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
    
		// 创建条件对象Condition
        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // Methods relayed from outer class
		// 返回目前拥有此锁的线程
        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }
    
		// 查询当前线程保持锁定的个数
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }
    
		// 查询此锁定是否由任意线程保持
        final boolean isLocked() {
            return getState() != 0;
        }

        /**
         * Reconstitutes the instance from a stream (that is, deserializes it).
         * 序列化
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

3.3、静态内部类FairSync、NonfairSync

非公平锁实现类NonfairSync
static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            // 通过CAS操作来修改state状态,表示争抢锁的操作。初始值0,修改值1
            if (compareAndSetState(0, 1))
                // 设置当前获得锁状态的线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 修改state状态失败,则尝试去获取锁
                acquire(1);
        }
    	/**
		 * 尝试独占方式获取锁,如果成功返回true,不成功返回false
         * Sync重写了AQS类中的nonfairTryAcquire方法
         */
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

公平锁实现类FairSync

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         * 尝试独占方式获取锁,如果成功返回true,不成功返回false
         */
        protected final boolean tryAcquire(int acquires) {
            // 获得当前执行的线程
            final Thread current = Thread.currentThread();
            // 获取state的值
            int c = getState();
            // state==0 表示无锁状态
            if (c == 0) {
                // 如果没有排队获取锁的,就开始获取锁
                if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果是同一个线程来获得锁,则直接增加重入次数
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

3.4、加锁步骤-AQS的acquire()方法

公平锁实现类FairSync调用lock进行加锁操作,里面只有acquire()一个方法。
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

这个方法的主要逻辑是:

  • 通过tryAcquire()尝试获取锁,如果成功返回true,失败返回false

  • 如果tryAcquire失败,则会通过addWaiter方法将当前线程封装成Node添加到AQS队列尾部。

  • private Node addWaiter(Node mode) {
        // 将当前线程封装成Node,并且mode为独占锁
        Node node = new Node(Thread.currentThread(), mode); 
        // Try the fast path of enq; backup to full enq on failure
        // tail(尾节点)是AQS的中表示同步队列队尾的属性。
        Node pred = tail;
        // tail(尾节点)不为空的情况,说明队列中存在节点数据
        if (pred != null) { 
            // 将当前线程的Node的prev(前一个)节点指向tail(尾节点)
            node.prev = pred;
            // 通过CAS将node添加到AQS队列
            if (compareAndSetTail(pred, node)) {
                // CAS成功,把旧的tail(尾节点)的next(下一个)指针指向新的tail(尾节点)
                pred.next = node;
                return node;
            }
        }
        // tail==null时,将node添加到同步队列中
        enq(node); 
        return node;
    }
  • enq()这个方法的最外层是一个大的for循环(死循环)。出口返回条件只有一个:node成功加入到链表的末尾。上一个方法中tail==null(链表为null)时,先判断t是否为null,如果为null,实例化一个空的Node节点,并且tail和head都指向这个空的Node,如果不为空,将node加入到链表末尾。

  • private Node enq(final Node node) {
        //自旋
        for (;;) {
            Node t = tail;
            if (t == null) {
                // CAS的方式创建一个空的Node作为头结点
                if (compareAndSetHead(new Node()))
                    // 此时队列中只一个头结点,所以t也指向它
                    tail = head;
            } else {
                // 将node的prev(前一个节点)指向t
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    // 将t的next(下一个节点)指向node
                    t.next = node;
                    return t;
                }
            }
        }
    }
  • acquireQueued()方法,将Node作为参数,通过自旋去尝试获取锁。大致步骤:

    1. 获取当前节点的prev(前一个)节点。

    2. 如果prev(前一个)节点为head(头)节点,那么它就有资格去争抢锁,调用tryAcquire()方法抢占锁。

    3. 抢占锁成功以后,把获得锁的节点设置为head(头),并且移除原来的初始化head(头)节点。

    4. 如果获得锁失败,则根据waitStatus决定是否需要挂起线程。

    5. 最后,通过cancelAcquire取消获得锁的操作。

  • final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取prev(前一个)节点,若为null即刻抛出NullPointException
                final Node p = node.predecessor();
                // 如果prev(前一个)节点为head才有资格进行锁的抢夺
                if (p == head && tryAcquire(arg)) {
                    // 获取锁成功后就不需要再进行同步操作了,获取锁成功的线程作为新的head节点。
                    setHead(node);
                    p.next = null; // help GC
                    // 获取锁成功
                    failed = false;
                    return interrupted;
                }
                // 如果获取锁失败,则根据节点的waitStatus决定是否需要挂起线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 若前面为true,则执行挂起,待下次唤醒的时候检测中断的标志
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                // 如果抛出异常/获取锁失败,进行出队(sync queue)操作
                cancelAcquire(node);
        }
    }
    
    // 设置head节点,head.thread与head.prev为null
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    
    /**
    * AQS中的方法,判断一个争用锁的线程是否应该被阻塞
    * @param pred 前继节点
    * @param node 当前节点
    */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取waitStatus
        int ws = pred.waitStatus;
        // CANCELLED(1):取消状态
    	// SIGNA(-1):等待触发状态,前节点可能是head/状态为CANCELLED(取消状态)
    	// CONDITION(-2):等待条件状态,在等待队列中
    	// PROPAGATE(-3):状态需要向后传播
        
        // 如果是SIGNAL状态,意味着当前线程需要被唤醒(unpark)
        if (ws == Node.SIGNAL)
            return true;
        //前继节点状态为CANCELLED(取消状态)/ PROPAGATE(状态需要向后传播)
        if (ws > 0) {
            do {
                // 则设置当前节点的prev(前一个节点)指向原先前继节点的prev(前一个节点)。
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //  如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL(等待触发)状态。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    /**
    * 如果shouldParkAfterFailedAcquire()方法返回了true,则会执行:parkAndCheckInterrupt()方法。
    * AQS中的方法,parkAndCheckInterrupt()是通过LockSupport.park(this)将当前线程挂起到WATING状态,它需要等待一个中断、unpark方法来唤醒它,通过这样一种FIFO的机制的等待,来实现了Lock的操作。
    */
    private final boolean parkAndCheckInterrupt() {
        // 将当前线程挂起到WATING状态
        LockSupport.park(this);
        return Thread.interrupted();
    }
  • parkAndCheckInterrupt()方法中的LockSupport类是Java6引入的一个类,是一个线程工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞,也可以在任意位置唤醒,它的内部其实两类主要的方法:park(阻塞线程)和unpark(唤醒线程)。LockSupport实际上是调用了Unsafe类里的函数。

  • public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);、//调用的native方法:public native void unpark(Thread jthread);  
    }
    
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);// 调用的native方法:public native void park(boolean isAbsolute, long time);  
        setBlocker(t, null);
    }
  • selfInterrupt()方法,产生一个中断。

  • private static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

3.5、释放锁步骤-unlock()方法

public void unlock() {
    sync.release(1);
}

unlock()方法调用了内部类sync的release()方法。

public final boolean release(int arg) {
    //尝试放锁(state-1),若释放后锁可被其他线程获取(state=0),返回true
    if (tryRelease(arg)) {
        Node h = head;
        // 当前队列不为空,且头结点状态不为初始化状态0,唤醒同步队列中被阻塞的线程
        if (h != null && h.waitStatus != 0)
            // 唤醒同步队列中被阻塞的线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}

release()方法第一步调用tryRelease(),尝试去释放锁(state-1)

protected final boolean tryRelease(int releases) {
    // state值-1
    int c = getState() - releases;
    // 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // state=0说明没有现在占该用锁,一旦释放锁,其他线程将能获取锁。
    if (c == 0) {
        free = true;
        // 清除锁的持有线程标记
        setExclusiveOwnerThread(null);
    }
    // 更新state值
    setState(c);
    return free;
}

如果释放锁成功,进入if里面,判断当前队列h不为空,且头结点状态不为初始化状态0,调用unparkSuccessor()唤醒同步队列中被阻塞的线程。由于前面tryRelease()方法已经把head(头)节点释放了,需要唤醒列队中下一个节点。

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
    	// status小于0
        if (ws < 0)
            //通过CAS 设置status为0
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
    	// 判断后继节点是否为空或者是否是取消状态
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 循环从队列尾部向前遍历找到最前面的一个waitStatus小于0的节点。
            // 至于为什么从尾部开始向前遍历,因为在doAcquireInterruptibly.cancelAcquire方法的处理过程中只设置了next的变化,没有设置prev的变化。
            // 在最后有这样一行代码:node.next = node,如果这时执行了unparkSuccessor方法,并且向后遍历的话,就成了死循环了,所以这时只有prev是稳定的。
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //如果获取到的节点不为空,则直接通过LockSupport.unpark()方法来唤醒线程,这样一来将会有一个节点唤醒后,该节点会调用tryAcquire()方法来尝试获取锁。
        if (s != null)
            // 唤醒线程
            LockSupport.unpark(s.thread);
    }
版权声明

非特殊说明,本文由Zender原创或收集发布,欢迎转载。

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

作者文章
热门