AbstractQueuedSynchronizer源码解析

AQS源码解析(未完待续)

简介

AbstractQueuedSynchronizer从名字可以看到三个信息:抽象的、同步的队列。它提供了同步状态、阻塞与唤醒线程以及先进先出队列的基础框架。JDK中许多并发工具类的实现都基于AQS,如ReentrantLock。

1、重要的成员变量

和所有的队列一样,FIFO队列也有头节点head和尾节点tail。
state是同步器的状态,什么是同步器的状态呢?可以理解为锁冲入的次数,开始state值为0,没有线程获取锁。当某个线程尝试获取锁并且成功的时候,state的值变为1。其它线程想要获取锁时,发现state = 1,则进入同步队列阻塞。那么为什么不用boolean表示呢?因为AQS底层设计为可重入锁,如果一个线程获取锁的时候发现state > 0,但是currentThread是自己,则state加1,获取锁成功。当然释放锁的时候,state是多少就要释放多少次。

private transient volatile Node head;
private transient volatile Node tail;
// The synchronization state.
private volatile int state;

2、构造方法

不重要

3、核心方法

3.1 内部类 Node()

我们发现,只要涉及到队列,基本上都有一个内部类Node()。但AQS的node类比较复杂,主要是有很多的变量,我们来一个个看它们具体代表什么含义。
SHARED: 表明该锁是共享锁。
EXCLUSIVE : 表明该锁是排他锁。
CANCELLED : 1;表明该节点被取消了。

SIGNAL : -1;表示该节点的后继节点处于阻塞状态,当该节点释放锁(releases)或者取消获取锁(cancels)时,需要唤醒他的后继节点。正常情况下,同步队列中的大部分节点的状态都为SIGNAL。

CONDITION :-2;表明该节点是在条件队列(condition queue)上等待,而不是同步队列。由于AQS不仅可以用来当锁(替代synchronized关键字),还具有替代Object的wait/notify的功能,如果一个线程调用了Condition.await(),该线程会加入到条件队列中(而不是同步队列),并将状态设置为CONDITION。

PROPAGATE : -3;共享模式下,由于可以有不只一个线程享用锁,前置节点不仅会唤醒他的后继节点,还可能会将唤醒操作进行“传播”,也就是会唤醒后继节点的后继节点。

waitStatus: 节点状态
nextWaiter: 1. 当该节点处于同步队列时(AQS自己本身的CLH队列),表明该节点是在竞争锁,如果是独占锁,那么nextWaiter=null,如果是共享锁,那么nextWaiter=new Node(),也就是说,nextWaiter此时仅仅相当于一个flag,用来表明该锁是独占锁还是共享锁。2. 当该节点处于条件队列时,nextWaiter指向他在队列中的后继节点。

static final class Node {
	static final Node SHARED = new Node();
	static final Node EXCLUSIVE = null;
	static final int CANCELLED =  1;
	static final int SIGNAL    = -1;
	static final int CONDITION = -2;
	static final int PROPAGATE = -3;
	volatile int waitStatus;
	volatile Node prev;
	volatile Node next;
	volatile Thread thread;
	Node nextWaiter;

	Node() {
 	}

	Node(Thread thread, Node mode) {
    	this.nextWaiter = mode;
    	this.thread = thread;
	}
	
	Node(Thread thread, int waitStatus) {
    	this.waitStatus = waitStatus;
    	this.thread = thread;
	}
}

3.2 内部类 ConditionObject()

// 暂不讲解

3.3 获取锁核心方法 acquire()

acquire()一共做了四件事:

  • 尝试获取锁。
  • 获取锁失败,则调用addWaiter()将当前线程加入同步队列。
  • acquireQueued(),死循环,一直尝试获取锁或直接阻塞。
  • selfInterrupt(),中断。

下面详细介绍下acquire()在这期间具体做了些什么。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

3.3.1 tryAcquire(int arg)
这个方法用于子类重写,实现自己需要的获取锁的逻辑。

// AQS
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

下面是ReentractLock中非公平锁的对此方法的重写。简单解释下:

  • 首先获取当前锁状态,如果state=0,则直接尝试CAS获取锁(突出一个非公平),获取成功则将exclusiveOwnerThread设为当前线程。
  • 如果state != 0,则看持有锁的线程是否为当前线程,如果是的话,直接重入,setState(c + acquires) 即重入次数。
// ReentractLock
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) 
        	// 这里小于0是因为最高位为符号位。
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

3.3.2 addWaiter(Node mode)
为当前线程创建一个节点mode并插入到同步队列的末尾。当然tail尾节点不能为null,为null则说明队列还未初始化,需要enq(node)初始化。enq(node)里面是一个死循环,先校验是否初始化,然后和addWaiter()前半部分一样将mode节点插入末尾。
那么为什么不直接调用enq(node),反而在外面先尝试插入呢?源码中有相关注释,主要是在锁竞争不激烈的情况下,一次CAS基本就能成功插入,提升性能。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

3.3.3 acquireQueued(final Node node, int arg)
这个方法主要分为两个部分:
1、死循环,然后判断当前节点的上一个节点是否为头结点,如果是,则尝试获取锁,成功则返回。如果不是,则执行shouldParkAfterFailedAcquire(p, node)先检查并且判断该节点是否应该阻塞,如果为true,再执行parkAndCheckInterrupt()检查是否应该中断,返回中断标记。
2、如果获取锁失败,则取消获取锁(逻辑复杂,后续讲解)。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

3.3.4 selfInterrupt()
如果3.3.3中的返回值为true,则中断当前线程。

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

3.4