AQS概念
AbstractQueuedSynchronizer是JUC中非常重要的一个同步控制工具
在JDK的并发包中,我们能看到很多很多的同步器类,比如ReentrantLock(可重入锁)、Semaphore(信号量)和CountDownLatch(倒计时闭锁)等等
在这些类中,都使用了AbstractQueuedSynchronizer(AQS)来构建
它们之间的继承关系如下图所示:
可以看出,在基于AQS构建的同步器类中,都继承了AbstractQueuedSynchronizer的子类,而且名字都叫做Sync
特别地,在ReentrantLock中Sync有两种实现类:NonfairSync和FairSync,分别实现了对不公平锁和公平锁的控制
AQS的概念,总共有三点:
- 状态信息:我们需要一个整数来保存当前的同步状态信息,这个信息在AbstractQueuedSynchronizer中由int属性state保存。它可以表示任意状态,比如ReentrantLock用它来表示线程已经重复获取锁的次数,Semaphore用它来表示剩余许可的数量
- 获取(aquire):获取锁或许可,通常会阻塞,直至状态信息处于可被获取的状态
- 释放(release):释放锁或许可,通常不会阻塞,释放操作会唤醒因获取而被阻塞的线程
AQS的内部结构
AQS在类内部维护了一个等待队列,这个队列叫作CLH队列。这个CLH队列实际上是一个双向链表,其示意图如下:
CLH队列的优点:
- 先进先出,可以保证公平性
- 非阻塞的队列,通过自旋锁和CAS保证节点插入和移除的原子性,实现无锁快速插入
- 采用了自旋锁思想,所以CLH队列也是一种基于链表的可扩展、高性能、公平的自旋锁
再来看一下AQS的UML图:
我们会发现:
-
链表的节点就是AQS内部的抽象类Node
-
AQS中的head就是指向队列的head节点,tail指向队列的尾节点
-
Node中有指向前节点的指针(prev)和指向后节点的指针(next),用waiter存放与此节点关联的线程,status则指明了线程的状态
可以看出,这个等待队列是存放着被阻塞的线程。在这个等待队列中的线程,会不断地尝试acquire,直到获取到锁或许可,改变状态信息为止
实际上,AQS内部不只是有一个等待队列,它为各个Condition对象都维护了一个条件等待队列
图中的ConditionObject就是一个Condition实现类,内部维护了firstWaiter和lastWaiter,指向队列的头和尾
当线程因为调用Condition类中的await()挂起时,它们会进入条件等待队列中。当它们被signal()或signalAll()唤醒时,会将他们转移到等待队列中,去尝试acquire
因为队列不一样,节点也不一样,因此Node节点派生了3个子类
- SharedNode:共享锁队列的节点
- ExclusiveNode:独占锁队列的节点
- ConditionNode:条件等待队列的节点
总而言之,AQS的结构如下图所示:
源码分析
让我们从一个简单的非公平ReentrantLock出发,分析AQS在加锁时的表现
lock()
首先调用ReentrantLock的lock()
public void lock() {
// 此处是调用了NonFairSync类的lock方法
// NonFairSync就是AQS在ReentrantLock中的实现类
sync.lock();
}
进入NonFairSync类的lock方法
@ReservedStackAccess
final void lock() {
if (!initialTryLock())
// 首先尝试获取锁,如果没获取到,则进入acquire(1)
// 1为要获取的资源数,因为此处为独占锁,所以数量为1
acquire(1);
}
// 调用initialTryLock()尝试获取锁
// 这个方法排除了锁重入的情况,接下来的情况都是非重入情况
final boolean initialTryLock() {
Thread current = Thread.currentThread();
if (compareAndSetState(0, 1)) {
// 使用CAS设置state,尝试获取锁,如果成功,则设置当前线程为owner
setExclusiveOwnerThread(current);
return true;
} else if (getExclusiveOwnerThread() == current) {
// 如果是已经持有锁的线程再次获得锁,则记录重复加锁的数量
int c = getState() + 1;
// 如果重入次数发生溢出,则抛异常
if (c < 0)
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
} else
return false;
}
进入AbstractQueuedSynchronizer的acquire(int arg)方法
实际上这个方法是对核心acquire方法的一层简单包装
public final void acquire(int arg) {
if (!tryAcquire(arg))
// 再次尝试获取锁,若失败则进入核心的线程同步方法acquire(),在这个方法中实现了对队列的管理
acquire(null, arg, false, false, false, 0L);
}
// 调用tryAcquire方法,再次尝试获取锁
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
接下来,我们将进入核心的acquire方法,这是AQS控制线程同步的核心流程,所有使用AQS的工具类最终都会调用此方法
在分析方法之前,我们先看一下节点有哪些状态(Node的status值)
// 节点处于等待状态,值为1(且必须为1)
static final int WAITING = 1;
// 节点被取消调度(被中断或Timeout),这个值必须为负数
static final int CANCELLED = 0x80000000;
// 节点处于条件等待队列中
static final int COND = 2;
关于acquire方法,官方文档中写道:
循环做以下几件事情:
检查节点是否为头节点
如果是,则保证head的稳定性,否则保证有效的前节点
如果节点是头节点或仍未入队,则尝试获取
如果节点还未创建,则创建节点
如果节点未入队,则尝试入队一次
如果线程从阻塞中被唤醒, 则再次尝试获取(直至指定的自旋时间)
如果WAITING状态未设置,则设置并重试
否则,阻塞当前线程,清除WAITING状态并检查是否有异常情况
我们可以对照官方文档,理清acquire方法的逻辑
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
// 自旋的时间
byte spins = 0, postSpins = 0;
// 中断标记,头节点标记
boolean interrupted = false, first = false;
// 在入队时的前一个节点
Node pred = null;
// 死循环,不断尝试获取锁
for (;;) {
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred)))
// 节点在队列中,且不是头节点时
if (pred.status < 0) {
// 如果前一个节点被取消调度,则调用cleanQueue()方法
// 此方法的作用是反复遍历CLH队列,清除被取消调度的节点,并唤醒正处于等待的节点
cleanQueue();
continue;
else if (pred.prev == null) {
// 此处的代码笔者暂时也不是很清楚,似乎是节点突然间变成了头节点
// 所以调用onSpinWait()让出CPU使用权,使得头节点稳定下来(不了解)
Thread.onSpinWait();
continue;
}
}
if (first || pred == null) {
// 如果节点是头节点,或者还没入队,则尝试获取锁
// 注意,获取锁的顺序并不是严格按照队列的FIFO顺序,未入队的线程也可以尝试获取锁
// 此处就体现了非公平的理念
boolean acquired;
try {
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
if (first) {
// 如果是头节点获取到锁,则此节点成为新的head
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
// 如果成功获取到锁,则返回1,所有获取到锁的线程都会从这里return回去
return 1;
}
}
if (node == null) {
// 如果节点仍未创建,则创建节点
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
} else if (pred == null) {
// 如果节点未入队,则尝试入队
node.waiter = current;
Node t = tail;
// 将前一个节点和当前节点联系起来
node.setPrevRelaxed(t);
if (t == null)
// 如果队列都没创建,则需要创建队列,初始化head
tryInitializeHead();
else if (!casTail(t, node))
// 利用CAS操作更新tail为当前节点
// 如果更新失败,则将当前节点的prev更新成null,在下一次循环中重新进行此操作
node.setPrevRelaxed(null); // back out
else
// 更新成功,把原tail的next设置为当前节点
t.next = node;
} else if (first && spins != 0) {
// 线程被唤醒之后,在自旋时间内持续尝试获取锁
--spins;
// 让出CPU使用权
Thread.onSpinWait();
} else if (node.status == 0) {
// 如果节点状态未设置,则设置status为WAITING(1)
node.status = WAITING;
} else {
long nanos;
// 设置自旋时间
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
// 如果没有设置阻塞时间,则阻塞该线程
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
// 如果设置了阻塞时间则阻塞指定时间
LockSupport.parkNanos(this, nanos);
else
break;
// 清除状态
node.clearStatus();
// 如果有中断到来,则直接跳出循环
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
// 跳出循环说明等待被中断了
return cancelAcquire(node, interrupted, interruptible);
}
阅读完源码我们可以发现,所有被锁阻塞的线程,都是阻塞在了这个核心的acquire方法中,不断地进行着一个死循环,当获得锁(即CAS改变AQS中的state)时,才能从acquire方法返回,进入到临界区中
当然,笔者只是给出了一个阅读源码的思路,其中的很多细节,笔者也没有完全搞懂,比如:
- 调用onSpinWait()的意义何在?
- byte类型的spins和postSpins变量,究竟有什么作用?
- 官方文档中的head的稳定性(ensure head stable),真正的含义是什么?
- …
Doug Lea的代码精妙至极,奈何笔者才疏学浅,疑惑之处甚多,还望各位朋友指点一二
unlock()
看完了加锁的源码,释放锁的源码就比较简单了,很轻松就能看懂
首先进入ReentrantLock的unlock方法
public void unlock() {
// 将AQS的资源释放
sync.release(1);
}
接着进入AbstractQueuedSynchronizer的release方法,以释放资源
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 释放资源后,还需唤醒下一个节点
signalNext(head);
return true;
}
return false;
}
// 尝试释放锁
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
// 现获取的资源数-要释放的资源数
int c = getState() - releases;
// 若释放锁的线程和持有锁的线程不一样,则抛出异常
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
boolean free = (c == 0);
if (free)
// free表示资源没有被线程占用
setExclusiveOwnerThread(null);
setState(c);
return free;
}
最后,调用signalNext方法唤醒下一个线程
private static void signalNext(Node h) {
Node s;
if (h != null && (s = h.next) != null && s.status != 0) {
// 取消下一个节点的WAITING状态
s.getAndUnsetStatus(WAITING);
// 唤醒下一个节点
LockSupport.unpark(s.waiter);
}
}
总结
AQS作为一个线程同步工具,其核心特点为:
-
维护一个状态量,线程可以获取与释放此状态量,来达到获取锁和释放锁的目的
-
维护一个等待队列(CLH队列),所有未获取到状态量的线程在其中排队,不断地尝试获取状态量
-
对于每一个条件对象(Condition Object),也维护一个条件等待队列,条件达成后,节点将进入正常的等待队列
-
获取锁后,节点被移出队列;释放锁后,节点会唤醒下一个节点