8、AQS-ReentrantLock
-
synchronized是独占锁,加锁、解锁过程自动进行(不能人工干预,不够灵活)。ReentrantLock也是独占锁,加锁和解锁的过程手动进行(非常灵活)。
-
synchronized可重入,因为加锁和解锁自动进行,不必担心最后是否释放锁。ReentrantLock也可重入,但加锁和解锁需要手动进行,且次数需一样,否则其他线程无法获得锁。
-
synchronized不可响应中断,一个线程获取不到锁就一直等着。ReentrantLock可以相应中断。
一、ReentrantLock类图
二、ReentrantLock时序图
三、ReentrantLock(重入锁)源码分析
3.1、构造方法
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
3.2、抽象静态内部类Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* 尝试获取锁,如果成功返回true,不成功返回false
* Sync重写了AQS类中的nonfairTryAcquire方法
*/
final boolean nonfairTryAcquire(int acquires) {
// 获得当前执行的线程
final Thread current = Thread.currentThread();
// 获取state的值
int c = getState();
// state==0 表示无锁状态
if (c == 0) {
// 通过CAS操作来修改state状态,初始值0,修改值acquires
if (compareAndSetState(0, acquires)) {
// 保存当前获得锁的线程
setExclusiveOwnerThread(current);
// 成功获取锁
return true;
}
}
// 如果是同一个线程来获得锁,则直接增加重入次数
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// state值-1
int c = getState() - releases;
// 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// state=0说明没有现在占该用锁,一旦释放锁,其他线程将能获取锁。
if (c == 0) {
free = true;
// 清除锁的持有线程标记
setExclusiveOwnerThread(null);
}
// 更新state值
setState(c);
return free;
}
// 判断当前线程是否正在独占资源,独占则返回true
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
// 创建条件对象Condition
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
// 返回目前拥有此锁的线程
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
// 查询当前线程保持锁定的个数
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
// 查询此锁定是否由任意线程保持
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
* 序列化
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
3.3、静态内部类FairSync、NonfairSync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// 通过CAS操作来修改state状态,表示争抢锁的操作。初始值0,修改值1
if (compareAndSetState(0, 1))
// 设置当前获得锁状态的线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 修改state状态失败,则尝试去获取锁
acquire(1);
}
/**
* 尝试独占方式获取锁,如果成功返回true,不成功返回false
* Sync重写了AQS类中的nonfairTryAcquire方法
*/
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
公平锁实现类FairSync
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
* 尝试独占方式获取锁,如果成功返回true,不成功返回false
*/
protected final boolean tryAcquire(int acquires) {
// 获得当前执行的线程
final Thread current = Thread.currentThread();
// 获取state的值
int c = getState();
// state==0 表示无锁状态
if (c == 0) {
// 如果没有排队获取锁的,就开始获取锁
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果是同一个线程来获得锁,则直接增加重入次数
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
3.4、加锁步骤-AQS的acquire()方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这个方法的主要逻辑是:
-
通过tryAcquire()尝试获取锁,如果成功返回true,失败返回false
-
如果tryAcquire失败,则会通过addWaiter方法将当前线程封装成Node添加到AQS队列尾部。
-
private Node addWaiter(Node mode) { // 将当前线程封装成Node,并且mode为独占锁 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // tail(尾节点)是AQS的中表示同步队列队尾的属性。 Node pred = tail; // tail(尾节点)不为空的情况,说明队列中存在节点数据 if (pred != null) { // 将当前线程的Node的prev(前一个)节点指向tail(尾节点) node.prev = pred; // 通过CAS将node添加到AQS队列 if (compareAndSetTail(pred, node)) { // CAS成功,把旧的tail(尾节点)的next(下一个)指针指向新的tail(尾节点) pred.next = node; return node; } } // tail==null时,将node添加到同步队列中 enq(node); return node; } -
enq()这个方法的最外层是一个大的for循环(死循环)。出口返回条件只有一个:node成功加入到链表的末尾。上一个方法中tail==null(链表为null)时,先判断t是否为null,如果为null,实例化一个空的Node节点,并且tail和head都指向这个空的Node,如果不为空,将node加入到链表末尾。
-
private Node enq(final Node node) { //自旋 for (;;) { Node t = tail; if (t == null) { // CAS的方式创建一个空的Node作为头结点 if (compareAndSetHead(new Node())) // 此时队列中只一个头结点,所以t也指向它 tail = head; } else { // 将node的prev(前一个节点)指向t node.prev = t; if (compareAndSetTail(t, node)) { // 将t的next(下一个节点)指向node t.next = node; return t; } } } } -
acquireQueued()方法,将Node作为参数,通过自旋去尝试获取锁。大致步骤:
-
获取当前节点的prev(前一个)节点。
-
如果prev(前一个)节点为head(头)节点,那么它就有资格去争抢锁,调用tryAcquire()方法抢占锁。
-
抢占锁成功以后,把获得锁的节点设置为head(头),并且移除原来的初始化head(头)节点。
-
如果获得锁失败,则根据waitStatus决定是否需要挂起线程。
-
最后,通过cancelAcquire取消获得锁的操作。
-
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取prev(前一个)节点,若为null即刻抛出NullPointException final Node p = node.predecessor(); // 如果prev(前一个)节点为head才有资格进行锁的抢夺 if (p == head && tryAcquire(arg)) { // 获取锁成功后就不需要再进行同步操作了,获取锁成功的线程作为新的head节点。 setHead(node); p.next = null; // help GC // 获取锁成功 failed = false; return interrupted; } // 如果获取锁失败,则根据节点的waitStatus决定是否需要挂起线程 if (shouldParkAfterFailedAcquire(p, node) && // 若前面为true,则执行挂起,待下次唤醒的时候检测中断的标志 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) // 如果抛出异常/获取锁失败,进行出队(sync queue)操作 cancelAcquire(node); } } // 设置head节点,head.thread与head.prev为null private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } /** * AQS中的方法,判断一个争用锁的线程是否应该被阻塞 * @param pred 前继节点 * @param node 当前节点 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取waitStatus int ws = pred.waitStatus; // CANCELLED(1):取消状态 // SIGNA(-1):等待触发状态,前节点可能是head/状态为CANCELLED(取消状态) // CONDITION(-2):等待条件状态,在等待队列中 // PROPAGATE(-3):状态需要向后传播 // 如果是SIGNAL状态,意味着当前线程需要被唤醒(unpark) if (ws == Node.SIGNAL) return true; //前继节点状态为CANCELLED(取消状态)/ PROPAGATE(状态需要向后传播) if (ws > 0) { do { // 则设置当前节点的prev(前一个节点)指向原先前继节点的prev(前一个节点)。 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL(等待触发)状态。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /** * 如果shouldParkAfterFailedAcquire()方法返回了true,则会执行:parkAndCheckInterrupt()方法。 * AQS中的方法,parkAndCheckInterrupt()是通过LockSupport.park(this)将当前线程挂起到WATING状态,它需要等待一个中断、unpark方法来唤醒它,通过这样一种FIFO的机制的等待,来实现了Lock的操作。 */ private final boolean parkAndCheckInterrupt() { // 将当前线程挂起到WATING状态 LockSupport.park(this); return Thread.interrupted(); } -
parkAndCheckInterrupt()方法中的LockSupport类是Java6引入的一个类,是一个线程工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞,也可以在任意位置唤醒,它的内部其实两类主要的方法:park(阻塞线程)和unpark(唤醒线程)。LockSupport实际上是调用了Unsafe类里的函数。
-
public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread);、//调用的native方法:public native void unpark(Thread jthread); } public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L);// 调用的native方法:public native void park(boolean isAbsolute, long time); setBlocker(t, null); } -
selfInterrupt()方法,产生一个中断。
-
private static void selfInterrupt() { Thread.currentThread().interrupt(); }
3.5、释放锁步骤-unlock()方法
public void unlock() {
sync.release(1);
}
unlock()方法调用了内部类sync的release()方法。
public final boolean release(int arg) {
//尝试放锁(state-1),若释放后锁可被其他线程获取(state=0),返回true
if (tryRelease(arg)) {
Node h = head;
// 当前队列不为空,且头结点状态不为初始化状态0,唤醒同步队列中被阻塞的线程
if (h != null && h.waitStatus != 0)
// 唤醒同步队列中被阻塞的线程
unparkSuccessor(h);
return true;
}
return false;
}
release()方法第一步调用tryRelease(),尝试去释放锁(state-1)
protected final boolean tryRelease(int releases) {
// state值-1
int c = getState() - releases;
// 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// state=0说明没有现在占该用锁,一旦释放锁,其他线程将能获取锁。
if (c == 0) {
free = true;
// 清除锁的持有线程标记
setExclusiveOwnerThread(null);
}
// 更新state值
setState(c);
return free;
}
如果释放锁成功,进入if里面,判断当前队列h不为空,且头结点状态不为初始化状态0,调用unparkSuccessor()唤醒同步队列中被阻塞的线程。由于前面tryRelease()方法已经把head(头)节点释放了,需要唤醒列队中下一个节点。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// status小于0
if (ws < 0)
//通过CAS 设置status为0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 判断后继节点是否为空或者是否是取消状态
if (s == null || s.waitStatus > 0) {
s = null;
// 循环从队列尾部向前遍历找到最前面的一个waitStatus小于0的节点。
// 至于为什么从尾部开始向前遍历,因为在doAcquireInterruptibly.cancelAcquire方法的处理过程中只设置了next的变化,没有设置prev的变化。
// 在最后有这样一行代码:node.next = node,如果这时执行了unparkSuccessor方法,并且向后遍历的话,就成了死循环了,所以这时只有prev是稳定的。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//如果获取到的节点不为空,则直接通过LockSupport.unpark()方法来唤醒线程,这样一来将会有一个节点唤醒后,该节点会调用tryAcquire()方法来尝试获取锁。
if (s != null)
// 唤醒线程
LockSupport.unpark(s.thread);
}
版权声明
非特殊说明,本文由Zender原创或收集发布,欢迎转载。
ZENDER

发表评论:
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。