java并发AQS

  1. 1.基本概念
  2. 2.AQS内部实现
    1. 2-1.state和Node
    2. 2-2.CLH
  3. 3.AQS 如何获取资源
  4. 4.AQS 如何释放资源
  5. 5.AQS 两种资源共享模式
  6. 6.并发手册

1.基本概念

AQS 是 AbstractQueuedSynchronizer 的简称,翻译成中文就是 抽象队列同步器 ,这三个单词分开来看:

  • Abstract (抽象):也就是说, AQS 是一个抽象类,只实现一些主要的逻辑,有些方法推迟到子类实现
  • Queued (队列):队列有啥特征呢?先进先出( FIFO )对吧?也就是说, AQS 是用先进先出队列来存储数据的
  • Synchronizer (同步):即 AQS 实现同步功能

以上概括一下, AQS 是一个用来构建锁和同步器的框架,使用 AQS 能简单而又高效地构造出同步器。

2.AQS内部实现

2-1.state和Node

AQS 队列在内部维护了一个 FIFO 的双向链表,如果对数据结构比较熟的话,应该很容易就能想到,在双向链表中,每个节点都有两个指针,分别指向直接前驱节点和直接后继节点。
使用双向链表的优点之一,就是从任意一个节点开始都很容易访问它的前驱节点和后继节点。

在AQS中,每个Node其实就是一个线程封装,当线程在竞争锁失败之后,会封装成 Node 加入到 AQS 队列中;
获取锁的线程释放锁之后,会从队列中唤醒一个阻塞的 Node (也就是线程)

AQS 使用 volatile 的变量 state 来作为资源的标识:

private volatile int state;

关于 state 状态的读取与修改,子类可以通过覆盖getState()setState()方法来实现自己的逻辑,其中比较重要的是:

// 传入期望值 expect ,想要修改的值 update ,然后通过 Unsafe 的 compareAndSwapInt() 即 CAS 操作来实现
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

下面是 AQS 中两个重要的成员变量:

private transient volatile Node head;   // 头结点
private transient volatile Node tail;   // 尾节点

直接来一张图会更形象一些:

img

Node 节点维护的是线程,控制线程的一些操作,具体来看看是 Node 是怎么做的:

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    // 标记一个节点,在 共享模式 下等待
    static final Node SHARED = new Node();

    /** Marker to indicate a node is waiting in exclusive mode */
    // 标记一个节点,在 独占模式 下等待
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    // waitStatus 的值,表示该节点从队列中取消
    static final int CANCELLED =  1;

    /** waitStatus value to indicate successor's thread needs unparking */
    // waitStatus 的值,表示后继节点在等待唤醒
    // 只有处于 signal 状态的节点,才能被唤醒
    static final int SIGNAL    = -1;

    /** waitStatus value to indicate thread is waiting on condition */
    // waitStatus 的值,表示该节点在等待一些条件
    static final int CONDITION = -2;

 /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
    */
    // waitStatus 的值,表示有资源可以使用,新 head 节点需要唤醒后继节点
    // 如果是在共享模式下,同步状态应该无条件传播下去
    static final int PROPAGATE = -3;

 // 节点状态,取值为 -3,-2,-1,0,1
    volatile int waitStatus;

 // 前驱节点
    volatile Node prev;

 // 后继节点
    volatile Node next;

 // 节点所对应的线程
    volatile Thread thread;

 // condition 队列中的后继节点
    Node nextWaiter;

 // 判断是否是共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

 /**
 * 返回前驱节点
 */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

 /**
 * 将线程构造成一个 Node 节点,然后添加到 condition 队列中
 */
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

 /**
 * 等待队列用到的方法
 */
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

2-2.CLH

关于 AQS 维护的双向链表,在源码中是这样解释的:

The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue. 
CLH locks are normally used for spinlocks.  We instead use them for blocking synchronizers, 
but use the same basic tactic of holding some of the control information 
about a thread in the predecessor of its node.  

CLH锁即Craig, Landin, and Hagersten (CLH) locks。CLH锁是一个自旋锁。能确保无饥饿性。提供先来先服务的公平性。

也就是 AQS 的等待队列是 CLH 锁定队列的变体

CLH缺点
CLH队列锁的长处是空间复杂度低(假设有n个线程。L个锁,每一个线程每次仅仅获取一个锁,那么须要的存储空间是O(L+n),n个线程有n个myNode。L个锁有L个tail),
CLH的一种变体被应用在了JAVA并发框架中。

CLH缺点
唯一的缺点是在NUMA系统结构下性能非常差。
在这样的系统结构下,每一个线程有自己的内存,假设前趋结点的内存位置比较远。自旋推断前趋结点的locked域,性能将大打折扣,可是在SMP系统结构下该法还是非常有效的。

CLH算法实现

public class CLHLock implements Lock {  
    AtomicReference<QNode> tail = new AtomicReference<QNode>(new QNode());  
    ThreadLocal<QNode> myPred;  
    ThreadLocal<QNode> myNode;  

    public CLHLock() {  
        tail = new AtomicReference<QNode>(new QNode());  
        myNode = new ThreadLocal<QNode>() {  
            protected QNode initialValue() {  
                return new QNode();  
            }  
        };  
        myPred = new ThreadLocal<QNode>() {  
            protected QNode initialValue() {  
                return null;  
            }  
        };  
    }  

    @Override  
    public void lock() {  
        QNode qnode = myNode.get();  
        qnode.locked = true;  
        QNode pred = tail.getAndSet(qnode);  
        myPred.set(pred);  
        while (pred.locked) {  
        }  
    }  

    @Override  
    public void unlock() {  
        QNode qnode = myNode.get();  
        qnode.locked = false;  
        myNode.set(myPred.get());  
    }  
}     

3.AQS 如何获取资源

在 AQS 中,获取资源的入口是 acquire(int arg) 方法,其中 arg 是获取资源的个数,来看下代码:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

在获取资源时,会首先调用 tryAcquire 方法,这个方法是在子类中具体实现的

如果通过 tryAcquire 获取资源失败,接下来会通过 addWaiter(Node.EXCLUSIVE) 方法,将这个线程插入到等待队列中,具体代码:

private Node addWaiter(Node mode) {
 // 生成该线程所对应的 Node 节点
    Node node = new Node(Thread.currentThread(), mode);
    // 将 Node 插入到队列中
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // 使用 CAS 操作,如果成功就返回
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果 pred == null 或者 CAS 操作失败,则调用 enq 方法再次自旋插入
    enq(node);
    return node;
}

// 自旋 CAS 插入等待队列
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

在上面能够看到使用的是 CAS 自旋插入,这是因为在 AQS 中会存在多个线程同时竞争资源的情况,进而一定会出现多个线程同时插入节点的操作,这里使用 CAS 自旋插入是为了保证操作的线程安全性

现在呢,申请 acquire(int arg) 方法,然后通过调用 addWaiter 方法,将一个 Node 插入到了队列尾部。处于等待队列节点是从头结点开始一个一个的去获取资源,获取资源方式如下:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 如果 Node 的前驱节点 p 是 head,说明 Node 是第二个节点,那么它就可以尝试获取资源
            if (p == head && tryAcquire(arg)) {
             // 如果资源获取成功,则将 head 指向自己
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 节点进入等待队列后,调用 shouldParkAfterFailedAcquire 或者 parkAndCheckInterrupt 方法
            // 进入阻塞状态,即只有头结点的线程处于活跃状态
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在获取资源时,除了 acquire 之外,还有三个方法:

  • acquireInterruptibly :申请可中断的资源(独占模式)
  • acquireShared :申请共享模式的资源
  • acquireSharedInterruptibly :申请可中断的资源(共享模式)

到这里,关于 AQS 如何获取资源就说的差不多了,接下来看看 AQS 是如何释放资源的

4.AQS 如何释放资源

释放资源相对于获取资源来说,简单了很多。源码如下:

public final boolean release(int arg) {
 // 如果释放锁成功
    if (tryRelease(arg)) { 
     // 获取 AQS 队列中的头结点
        Node h = head;
        // 如果头结点不为空,且状态 != 0
        if (h != null && h.waitStatus != 0)
         // 调用 unparkSuccessor(h) 方法,唤醒后续节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 如果状态是负数,尝试将它改为 0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
 // 得到头结点的后继节点
    Node s = node.next;
    // 如果 waitStatus 大于 0 ,说明这个节点被取消
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 那就从尾节点开始,找到距离 head 最近的一个 waitStatus<=0 的节点进行唤醒
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果后继节点不为空,则将其从阻塞状态变为非阻塞状态
    if (s != null)
        LockSupport.unpark(s.thread);
}

5.AQS 两种资源共享模式

资源有两种共享模式:

  • 独占模式( Exclusive ):资源是独占的,也就是一次只能被一个线程占有,比如 ReentrantLock
  • 共享模式( Share ):同时可以被多个线程获取,具体的资源个数可以通过参数来确定,比如 Semaphore/CountDownLatch

6.并发手册

最好配合手册全面学习


转载请注明来源,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 157162006@qq.com

文章标题:java并发AQS

字数:2.3k

本文作者:沐雨云楼

发布时间:2020-06-02, 20:01:48

最后更新:2020-09-12, 21:21:47

原始链接:https://iworkh.gitee.io/blog/2020/06/02/java-concurrent-aqs/

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录
×

喜欢就点赞,疼爱就打赏

pgmanor iworkh gitee