AQS源码解析(未完待续)
简介
AbstractQueuedSynchronizer从名字可以看到三个信息:抽象的、同步的队列。它提供了同步状态、阻塞与唤醒线程以及先进先出队列的基础框架。JDK中许多并发工具类的实现都基于AQS,如ReentrantLock。
1、重要的成员变量
和所有的队列一样,FIFO队列也有头节点head和尾节点tail。
state是同步器的状态,什么是同步器的状态呢?可以理解为锁冲入的次数,开始state值为0,没有线程获取锁。当某个线程尝试获取锁并且成功的时候,state的值变为1。其它线程想要获取锁时,发现state = 1,则进入同步队列阻塞。那么为什么不用boolean表示呢?因为AQS底层设计为可重入锁,如果一个线程获取锁的时候发现state > 0,但是currentThread是自己,则state加1,获取锁成功。当然释放锁的时候,state是多少就要释放多少次。
private transient volatile Node head; private transient volatile Node tail; // The synchronization state. private volatile int state;
2、构造方法
不重要
3、核心方法
3.1 内部类 Node()
我们发现,只要涉及到队列,基本上都有一个内部类Node()。但AQS的node类比较复杂,主要是有很多的变量,我们来一个个看它们具体代表什么含义。
SHARED: 表明该锁是共享锁。
EXCLUSIVE : 表明该锁是排他锁。
CANCELLED : 1;表明该节点被取消了。
SIGNAL : -1;表示该节点的后继节点处于阻塞状态,当该节点释放锁(releases)或者取消获取锁(cancels)时,需要唤醒他的后继节点。正常情况下,同步队列中的大部分节点的状态都为SIGNAL。
CONDITION :-2;表明该节点是在条件队列(condition queue)上等待,而不是同步队列。由于AQS不仅可以用来当锁(替代synchronized关键字),还具有替代Object的wait/notify的功能,如果一个线程调用了Condition.await(),该线程会加入到条件队列中(而不是同步队列),并将状态设置为CONDITION。
PROPAGATE : -3;共享模式下,由于可以有不只一个线程享用锁,前置节点不仅会唤醒他的后继节点,还可能会将唤醒操作进行“传播”,也就是会唤醒后继节点的后继节点。
waitStatus: 节点状态
nextWaiter: 1. 当该节点处于同步队列时(AQS自己本身的CLH队列),表明该节点是在竞争锁,如果是独占锁,那么nextWaiter=null,如果是共享锁,那么nextWaiter=new Node(),也就是说,nextWaiter此时仅仅相当于一个flag,用来表明该锁是独占锁还是共享锁。2. 当该节点处于条件队列时,nextWaiter指向他在队列中的后继节点。
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; Node() { } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
3.2 内部类 ConditionObject()
// 暂不讲解
3.3 获取锁核心方法 acquire()
acquire()一共做了四件事:
- 尝试获取锁。
- 获取锁失败,则调用addWaiter()将当前线程加入同步队列。
- acquireQueued(),死循环,一直尝试获取锁或直接阻塞。
- selfInterrupt(),中断。
下面详细介绍下acquire()在这期间具体做了些什么。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
3.3.1 tryAcquire(int arg)
这个方法用于子类重写,实现自己需要的获取锁的逻辑。
// AQS protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
下面是ReentractLock中非公平锁的对此方法的重写。简单解释下:
- 首先获取当前锁状态,如果state=0,则直接尝试CAS获取锁(突出一个非公平),获取成功则将exclusiveOwnerThread设为当前线程。
- 如果state != 0,则看持有锁的线程是否为当前线程,如果是的话,直接重入,setState(c + acquires) 即重入次数。
// ReentractLock protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // 这里小于0是因为最高位为符号位。 throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
3.3.2 addWaiter(Node mode)
为当前线程创建一个节点mode并插入到同步队列的末尾。当然tail尾节点不能为null,为null则说明队列还未初始化,需要enq(node)初始化。enq(node)里面是一个死循环,先校验是否初始化,然后和addWaiter()前半部分一样将mode节点插入末尾。
那么为什么不直接调用enq(node),反而在外面先尝试插入呢?源码中有相关注释,主要是在锁竞争不激烈的情况下,一次CAS基本就能成功插入,提升性能。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
3.3.3 acquireQueued(final Node node, int arg)
这个方法主要分为两个部分:
1、死循环,然后判断当前节点的上一个节点是否为头结点,如果是,则尝试获取锁,成功则返回。如果不是,则执行shouldParkAfterFailedAcquire(p, node)先检查并且判断该节点是否应该阻塞,如果为true,再执行parkAndCheckInterrupt()检查是否应该中断,返回中断标记。
2、如果获取锁失败,则取消获取锁(逻辑复杂,后续讲解)。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
3.3.4 selfInterrupt()
如果3.3.3中的返回值为true,则中断当前线程。
static void selfInterrupt() { Thread.currentThread().interrupt(); }