AQS 是个啥?

啥?你连 AQS 是啥都不知道?

如果想要精通 Java 并发的话, AQS 是一定要掌握的。今天跟着阿粉一起搞一搞

基本概念

AQS 是 AbstractQueuedSynchronizer 的简称,翻译成中文就是 抽象队列同步器 ,这三个单词分开来看:

  • Abstract (抽象):也就是说, AQS 是一个抽象类,只实现一些主要的逻辑,有些方法推迟到子类实现

  • Queued (队列):队列有啥特征呢?先进先出( FIFO )对吧?也就是说, AQS 是用先进先出队列来存储数据的

  • Synchronizer (同步):即 AQS 实现同步功能

以上概括一下, AQS 是一个用来构建锁和同步器的框架,使用 AQS 能简单而又高效地构造出同步器。

AQS 内部实现

AQS 队列在内部维护了一个 FIFO 的双向链表,如果对数据结构比较熟的话,应该很容易就能想到,在双向链表中,每个节点都有两个指针,分别指向直接前驱节点和直接后继节点。使用双向链表的优点之一,就是从任意一个节点开始都很容易访问它的前驱节点和后继节点。

在 AQS 中,每个 Node 其实就是一个线程封装,当线程在竞争锁失败之后,会封装成 Node 加入到 AQS 队列中;获取锁的线程释放锁之后,会从队列中唤醒一个阻塞的 Node (也就是线程)

AQS 使用 volatile 的变量 state 来作为资源的标识:

1
private volatile int state;

关于 state 状态的读取与修改,子类可以通过覆盖 getState() 和 setState() 方法来实现自己的逻辑,其中比较重要的是:

1
2
3
4
5
// 传入期望值 expect ,想要修改的值 update ,然后通过 Unsafe 的 compareAndSwapInt() 即 CAS 操作来实现
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

下面是 AQS 中两个重要的成员变量:

1
2
private transient volatile Node head;   // 头结点
private transient volatile Node tail;   // 尾节点

关于 AQS 维护的双向链表,在源码中是这样解释的:

1
2
3
4
The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue. 
CLH locks are normally used for spinlocks.  We instead use them for blocking synchronizers, 
but use the same basic tactic of holding some of the control information 
about a thread in the predecessor of its node.  

也就是 AQS 的等待队列是 “CLH” 锁定队列的变体

直接来一张图会更形象一些:

Node 节点维护的是线程,控制线程的一些操作,具体来看看是 Node 是怎么做的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    // 标记一个节点,在 共享模式 下等待
    static final Node SHARED = new Node();
	
    /** Marker to indicate a node is waiting in exclusive mode */
    // 标记一个节点,在 独占模式 下等待
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    // waitStatus 的值,表示该节点从队列中取消
    static final int CANCELLED =  1;
	
    /** waitStatus value to indicate successor's thread needs unparking */
    // waitStatus 的值,表示后继节点在等待唤醒
    // 只有处于 signal 状态的节点,才能被唤醒
    static final int SIGNAL    = -1;
	
    /** waitStatus value to indicate thread is waiting on condition */
    // waitStatus 的值,表示该节点在等待一些条件
    static final int CONDITION = -2;
	
	/**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
    */
    // waitStatus 的值,表示有资源可以使用,新 head 节点需要唤醒后继节点
    // 如果是在共享模式下,同步状态应该无条件传播下去
    static final int PROPAGATE = -3;

	// 节点状态,取值为 -3,-2,-1,0,1
    volatile int waitStatus;

	// 前驱节点
    volatile Node prev;

	// 后继节点
    volatile Node next;

	// 节点所对应的线程
    volatile Thread thread;

	// condition 队列中的后继节点
    Node nextWaiter;

	// 判断是否是共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

	/**
	* 返回前驱节点
	*/
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

	/**
	* 将线程构造成一个 Node 节点,然后添加到 condition 队列中
	*/
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

	/**
	* 等待队列用到的方法
	*/
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

AQS 如何获取资源

在 AQS 中,获取资源的入口是 acquire(int arg) 方法,其中 arg 是获取资源的个数,来看下代码:

1
2
3
4
5
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

在获取资源时,会首先调用 tryAcquire 方法,这个方法是在子类中具体实现的

如果通过 tryAcquire 获取资源失败,接下来会通过 addWaiter(Node.EXCLUSIVE) 方法,将这个线程插入到等待队列中,具体代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private Node addWaiter(Node mode) {
	// 生成该线程所对应的 Node 节点
    Node node = new Node(Thread.currentThread(), mode);
    // 将 Node 插入到队列中
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // 使用 CAS 操作,如果成功就返回
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果 pred == null 或者 CAS 操作失败,则调用 enq 方法再次自旋插入
    enq(node);
    return node;
}
	
// 自旋 CAS 插入等待队列
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

在上面能够看到使用的是 CAS 自旋插入,这是因为在 AQS 中会存在多个线程同时竞争资源的情况,进而一定会出现多个线程同时插入节点的操作,这里使用 CAS 自旋插入是为了保证操作的线程安全性

现在呢,申请 acquire(int arg) 方法,然后通过调用 addWaiter 方法,将一个 Node 插入到了队列尾部。处于等待队列节点是从头结点开始一个一个的去获取资源,获取资源方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 如果 Node 的前驱节点 p 是 head,说明 Node 是第二个节点,那么它就可以尝试获取资源
            if (p == head && tryAcquire(arg)) {
            	// 如果资源获取成功,则将 head 指向自己
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 节点进入等待队列后,调用 shouldParkAfterFailedAcquire 或者 parkAndCheckInterrupt 方法
            // 进入阻塞状态,即只有头结点的线程处于活跃状态
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在获取资源时,除了 acquire 之外,还有三个方法:

  • acquireInterruptibly :申请可中断的资源(独占模式)

  • acquireShared :申请共享模式的资源

  • acquireSharedInterruptibly :申请可中断的资源(共享模式)

到这里,关于 AQS 如何获取资源就说的差不多了,接下来看看 AQS 是如何释放资源的

AQS 如何释放资源

释放资源相对于获取资源来说,简单了很多。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public final boolean release(int arg) {
	// 如果释放锁成功
    if (tryRelease(arg)) {	
    	// 获取 AQS 队列中的头结点
        Node h = head;
        // 如果头结点不为空,且状态 != 0
        if (h != null && h.waitStatus != 0)
        	// 调用 unparkSuccessor(h) 方法,唤醒后续节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 如果状态是负数,尝试将它改为 0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
	// 得到头结点的后继节点
    Node s = node.next;
    // 如果 waitStatus 大于 0 ,说明这个节点被取消
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 那就从尾节点开始,找到距离 head 最近的一个 waitStatus<=0 的节点进行唤醒
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果后继节点不为空,则将其从阻塞状态变为非阻塞状态
    if (s != null)
        LockSupport.unpark(s.thread);
}

AQS 两种资源共享模式

资源有两种共享模式:

  • 独占模式( Exclusive ):资源是独占的,也就是一次只能被一个线程占有,比如 ReentrantLock

  • 共享模式( Share ):同时可以被多个线程获取,具体的资源个数可以通过参数来确定,比如 Semaphore/CountDownLatch

看到这里, AQS 你 get 了嘛?

Java Geek Tech wechat
欢迎订阅 Java 极客技术,这里分享关于 Java 的一切。