JUC学习 - AbstractQueuedSynchronizer源码解读(AQS)

2021-12-03
2

一、AbstractQueuedSynchronizer的认识

AbstractQueuedSynchronizer抽象队列同步器简称AQS,它是实现同步器的基础组件,juc下面Lock的实现以及一些并发工具类就是通过AQS来实现的。

AQS会把所有的请求线程构成一个CLH队列,当一个线程执行完毕(lock.unlock())时会激活自己的后继节点,但正在执行的线程并不在队列中,而那些等待执行的线程全部处于阻塞状态(park())。【关于CLH队列的描述,参考:https://blog.csdn.net/firebolt100/article/details/82662102
在这里插入图片描述

  • AQS是一个通过内置的FIFO双向队列来完成线程的排队工作(内部通过结点head和tail记录队首和队尾元素,元素的结点类型为Node类型)。
/*等待队列的队首结点(懒加载,这里体现为竞争失败的情况下,加入同步队列的线程执行到enq方法的时候会创
建一个Head结点)。该结点只能被setHead方法修改。并且结点的waitStatus不能为CANCELLED*/
private transient volatile Node head;
/**等待队列的尾节点,也是懒加载的。(enq方法)。只在加入新的阻塞结点的情况下修改*/
private transient volatile Node tail;

在这里插入图片描述

  • AQS内部维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。子类们必须定义改变state变量的protected方法,这些方法定义了state是如何被获取或释放的。【volatile虽然不能保证操作的原子性,但是保证了当前变量state的可见性。】
//AQS核心:同步状态
private volatile int state;

protected final int getState() {
    
    
    return state;
}

protected final void setState(int newState) {
    
    
    state = newState;
}

protected final boolean compareAndSetState(int expect, int update) {
    
    
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
  • AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
    不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
1isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
2tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false3tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false4tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
5tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false

上面这些方法在后面还会介绍。

ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后续动作。

一般来说,自定义同步器要么是独占方式,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock

  • Node 节点类
    static final class Node {
    
    
        /** Marker to indicate a node is waiting in shared mode */
        // 表示一个结点处于共享模式的等待
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        // 表示一个结点处于独占模式的等待
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev's from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        // 将该节点加入队列的线程。在构造时初始化并在使用后归零。
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        // 如果节点在共享模式下等待,则返回 true。
        final boolean isShared() {
    
    
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        // 返回结点的前继结点
        final Node predecessor() throws NullPointerException {
    
    
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
		
        // 用于建立初始头部或共享标记
        Node() {
    
        // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {
    
         // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) {
    
     // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

二、AQS 的源码解读

1、结点状态waitStatus

/** waitStatus value to indicate thread has cancelled */
// 表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
static final int CANCELLED =  1;

/** waitStatus value to indicate successor's thread needs unparking */
// 表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
static final int SIGNAL    = -1;

/** waitStatus value to indicate thread is waiting on condition */
// 表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
static final int CONDITION = -2;

/**
  * waitStatus value to indicate the next acquireShared should
  * unconditionally propagate
  */
// 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
static final int PROPAGATE = -3;

// waitStatus表示当前线程的等待状态,新结点入队时的默认状态为 0。
volatile int waitStatus;

注意:负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常。【以上状态需要记住,在后面的源码中有应用】

2、acquire(int) 方法

此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就可以去执行其临界区(共享资源)代码了。下面是acquire()的源码:

/**
  * Acquires in exclusive mode, ignoring interrupts.  Implemented
  * by invoking at least once {@link #tryAcquire},
  * returning on success.  Otherwise the thread is queued, possibly
  * repeatedly blocking and unblocking, invoking {@link
  * #tryAcquire} until success.  This method can be used
  * to implement method {@link Lock#lock}.
  *
  * @param arg the acquire argument.  This value is conveyed to
  *        {@link #tryAcquire} but is otherwise uninterpreted and
  *        can represent anything you like.
  */
public final void acquire(int arg) {
    
    
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 中断当前线程
        selfInterrupt();
}

通过注释我们知道,acquire方法是一种互斥模式,且忽略中断。该方法至少执行一次tryAcquire(int)方法,如果tryAcquire(int)方法返回true,则acquire直接返回,否则当前线程需要进入队列进行排队。函数流程如下:

  1. tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  2. addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上

2.1 tryAcquire(int) 方法

tryAcquire尝试以独占的方式获取资源,如果获取成功,则直接返回true,否则直接返回false。该方法可以用于实现Lock中的tryLock()方法。该方法的默认实现是抛出UnsupportedOperationException,具体实现由自定义的扩展了AQS的同步类来实现。AQS在这里只负责定义了一个公共的方法框架。

   /**
     * Attempts to acquire in exclusive mode. This method should query
     * if the state of the object permits it to be acquired in the
     * exclusive mode, and if so to acquire it.
     *
     * <p>This method is always invoked by the thread performing
     * acquire.  If this method reports failure, the acquire method
     * may queue the thread, if it is not already queued, until it is
     * signalled by a release from some other thread. This can be used
     * to implement method {@link Lock#tryLock()}.
     *
     * <p>The default
     * implementation throws {@link UnsupportedOperationException}.
     *
     * @param arg the acquire argument. This value is always the one
     *        passed to an acquire method, or is the value saved on entry
     *        to a condition wait.  The value is otherwise uninterpreted
     *        and can represent anything you like.
     * @return {@code true} if successful. Upon success, this object has
     *         been acquired.
     * @throws IllegalMonitorStateException if acquiring would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if exclusive mode is not supported
     */
    protected boolean tryAcquire(int arg) {
    
    
        throw new UnsupportedOperationException();
    }

注意:这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。用一句话来说就是减少没必要的操作。

2.2 addWaiter(Node) 方法

该方法用于将当前线程根据不同的模式(Node.EXCLUSIVE互斥模式、Node.SHARED共享模式)加入到等待队列的队尾,并返回当前线程所在的结点。如果队列不为空,则以通过compareAndSetTail方法以CAS的方式将当前线程节点加入到等待队列的末尾。否则,通过enq(node)方法初始化一个等待队列,并返回当前节点。源码如下:

   /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
    
    
        // 以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
        Node node = new Node(Thread.currentThread(), mode);
        
        // Try the fast path of enq; backup to full enq on failure
        // 尝试快速方式直接放到队尾。
        Node pred = tail;
        if (pred != null) {
    
    
            node.prev = pred; // 当前结点的前驱为队列的尾结点
            if (compareAndSetTail(pred, node)) {
    
      // 更新队列的尾结点为当前结点
                pred.next = node;
                return node;
            }
        }
        
        // 上一步失败说明队列为空,则通过enq初始化一个等待队列。
        enq(node);
        return node;
    }

2.2.1 enq(Node) 方法

enq(node)用于将当前节点插入等待队列,如果队列为空,则初始化当前队列。整个过程以CAS自旋的方式进行,直到成功加入队尾为止。源码如下:

   /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
    
    
        // CAS"自旋",直到成功加入队尾
        for (;;) {
    
    
            Node t = tail; 
            // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
            if (t == null) {
    
     // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
    
     // 正常流程,放入队尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
    
    
                    t.next = node;
                    return t;
                }
            }
        }
    }

2.3 acquireQueued(Node, int) 方法

acquireQueued()用于队列中的线程自旋地以独占且不可中断的方式获取同步状态(acquire),直到拿到锁之后再返回。该方法的实现分成两部分:如果当前节点已经成为头结点,尝试获取锁(tryAcquire)成功,然后返回;否则检查当前节点是否应该被park,然后将该线程park并且检查当前线程是否被可以被中断。源码如下:

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
    
    
        // 标记是否成功拿到资源
        boolean failed = true;
        try {
    
    
            // 标记等待过程中是否被中断过
            boolean interrupted = false;
            // 自旋
            for (;;) {
    
    
                // 拿到前驱
                final Node p = node.predecessor();
                // 如果前驱是head,尝试获取资源
                if (p == head && tryAcquire(arg)) {
    
    
                    // 拿到资源后,将当前结点设置为 head 结点。所以 head 所指的标杆结点,就是当前获取到资源的那个结点或null。
                    setHead(node);
                    // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
                    p.next = null; // help GC
                    // 成功获取资源
                    failed = false;
                    // 返回等待过程中是否被中断过
                    return interrupted;
                }
                // 如果请求资源失败,就通过park()进入waiting状态,直到被unpark()。如果可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
                    interrupted = true;
            }
        } finally {
    
    
             // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
            if (failed)
                cancelAcquire(node);
        }
    }

2.3.1 shouldParkAfterFailedAcquire(Node, Node) 方法

shouldParkAfterFailedAcquire方法通过对当前节点的前一个节点的状态进行判断,对当前节点做出不同的操作。

   /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    
    
        // 拿到前驱的状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            // 前驱的状态为 SIGNAL 时通知后继结点取消阻塞
            return true;
        if (ws > 0) {
    
    
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            // 如果前驱结点被取消了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边
            do {
    
    
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
    
    
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            // 如果前驱的状态为正常等待,那就把前驱的状态设置成SIGNAL,告诉它获取完资源后通知自己一下。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

	// CAS 修改结点的等待状态
	private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
    
    
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }

整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。

2.3.2 parkAndCheckInterrupt() 方法

该方法让线程去休息,真正进入等待状态。park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。

   /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
    
    
        LockSupport.park(this);
        return Thread.interrupted();
    }

2.3.3 cancelAcquire(Node) 方法

将当前结点移除出等待队列。

    /**
     * Cancels an ongoing attempt to acquire.
     *
     * @param node the node
     */
    private void cancelAcquire(Node node) {
    
    
        // Ignore if node doesn't exist
        if (node == null)  // 结点不存在直接结束
            return;

        node.thread = null;  // 清空当前结点的线程

        // Skip cancelled predecessors
        // 跳过已经被取消的前驱结点
        Node pred = node.prev;  // 获取当前结点的前驱结点
        while (pred.waitStatus > 0)  
            node.prev = pred = pred.prev;  // 把当前结点接在正常状态的结点的尾部

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        // 获取正常等待状态的结点的下一个结点
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        // 将当前结点的等待状态设置为已取消
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        // 如果当前结点为尾结点,移除当前结点,并且将前驱结点设置为尾结点
        if (node == tail && compareAndSetTail(node, pred)) {
    
    
            // 清空前驱结点的后继结点
            compareAndSetNext(pred, predNext, null);
        } else {
    
    
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
    
    
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
    
    
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

2.3.4 acquireQueued() 的具体执行流程

  1. 结点进入队尾后,检查状态,找到安全休息点;
  2. 调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己;
  3. 被唤醒后,检查能否可以获取资源。如果可以获取,head指向当前结点,并返回从入队到获取资源的整个过程中是否被中断过;如果没获取成功,继续流程1。

2.4 acquire(int) 方法的执行流程

  1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  2. 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

在这里插入图片描述

参考文章:

  1. https://www.cnblogs.com/waterystone/p/4920797.html
  2. https://www.cnblogs.com/fsmly/p/11274572.html
  3. http://gee.cs.oswego.edu/dl/papers/aqs.pdf - Doug Lea 大神的论文

评论