AQS源码解析(未完待续)
简介
AbstractQueuedSynchronizer从名字可以看到三个信息:抽象的、同步的队列。它提供了同步状态、阻塞与唤醒线程以及先进先出队列的基础框架。JDK中许多并发工具类的实现都基于AQS,如ReentrantLock。
1、重要的成员变量
和所有的队列一样,FIFO队列也有头节点head和尾节点tail。
state是同步器的状态,什么是同步器的状态呢?可以理解为锁冲入的次数,开始state值为0,没有线程获取锁。当某个线程尝试获取锁并且成功的时候,state的值变为1。其它线程想要获取锁时,发现state = 1,则进入同步队列阻塞。那么为什么不用boolean表示呢?因为AQS底层设计为可重入锁,如果一个线程获取锁的时候发现state > 0,但是currentThread是自己,则state加1,获取锁成功。当然释放锁的时候,state是多少就要释放多少次。
private transient volatile Node head; private transient volatile Node tail; // The synchronization state. private volatile int state;
2、构造方法
不重要
3、核心方法
3.1 内部类 Node()
我们发现,只要涉及到队列,基本上都有一个内部类Node()。但AQS的node类比较复杂,主要是有很多的变量,我们来一个个看它们具体代表什么含义。
SHARED: 表明该锁是共享锁。
EXCLUSIVE : 表明该锁是排他锁。
CANCELLED : 1;表明该节点被取消了。
SIGNAL : -1;表示该节点的后继节点处于阻塞状态,当该节点释放锁(releases)或者取消获取锁(cancels)时,需要唤醒他的后继节点。正常情况下,同步队列中的大部分节点的状态都为SIGNAL。
CONDITION :-2;表明该节点是在条件队列(condition queue)上等待,而不是同步队列。由于AQS不仅可以用来当锁(替代synchronized关键字),还具有替代Object的wait/notify的功能,如果一个线程调用了Condition.await(),该线程会加入到条件队列中(而不是同步队列),并将状态设置为CONDITION。
PROPAGATE : -3;共享模式下,由于可以有不只一个线程享用锁,前置节点不仅会唤醒他的后继节点,还可能会将唤醒操作进行“传播”,也就是会唤醒后继节点的后继节点。
waitStatus: 节点状态
nextWaiter: 1. 当该节点处于同步队列时(AQS自己本身的CLH队列),表明该节点是在竞争锁,如果是独占锁,那么nextWaiter=null,如果是共享锁,那么nextWaiter=new Node(),也就是说,nextWaiter此时仅仅相当于一个flag,用来表明该锁是独占锁还是共享锁。2. 当该节点处于条件队列时,nextWaiter指向他在队列中的后继节点。
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
3.2 内部类 ConditionObject()
// 暂不讲解
3.3 获取锁核心方法 acquire()
acquire()一共做了四件事:
- 尝试获取锁。
- 获取锁失败,则调用addWaiter()将当前线程加入同步队列。
- acquireQueued(),死循环,一直尝试获取锁或直接阻塞。
- selfInterrupt(),中断。
下面详细介绍下acquire()在这期间具体做了些什么。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
3.3.1 tryAcquire(int arg)
这个方法用于子类重写,实现自己需要的获取锁的逻辑。
// AQS
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
下面是ReentractLock中非公平锁的对此方法的重写。简单解释下:
- 首先获取当前锁状态,如果state=0,则直接尝试CAS获取锁(突出一个非公平),获取成功则将exclusiveOwnerThread设为当前线程。
- 如果state != 0,则看持有锁的线程是否为当前线程,如果是的话,直接重入,setState(c + acquires) 即重入次数。
// ReentractLock
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
// 这里小于0是因为最高位为符号位。
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
3.3.2 addWaiter(Node mode)
为当前线程创建一个节点mode并插入到同步队列的末尾。当然tail尾节点不能为null,为null则说明队列还未初始化,需要enq(node)初始化。enq(node)里面是一个死循环,先校验是否初始化,然后和addWaiter()前半部分一样将mode节点插入末尾。
那么为什么不直接调用enq(node),反而在外面先尝试插入呢?源码中有相关注释,主要是在锁竞争不激烈的情况下,一次CAS基本就能成功插入,提升性能。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
3.3.3 acquireQueued(final Node node, int arg)
这个方法主要分为两个部分:
1、死循环,然后判断当前节点的上一个节点是否为头结点,如果是,则尝试获取锁,成功则返回。如果不是,则执行shouldParkAfterFailedAcquire(p, node)先检查并且判断该节点是否应该阻塞,如果为true,再执行parkAndCheckInterrupt()检查是否应该中断,返回中断标记。
2、如果获取锁失败,则取消获取锁(逻辑复杂,后续讲解)。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.3.4 selfInterrupt()
如果3.3.3中的返回值为true,则中断当前线程。
static void selfInterrupt() {
Thread.currentThread().interrupt();
}