JUC学习 - AbstractQueuedSynchronizer源码解读(AQS)(补)

2021-12-03
4

接上一篇博客 https://blog.csdn.net/qq_43605444/article/details/121705312?spm=1001.2014.3001.5501

3、release(int) 方法

release(int)方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义,当然不仅仅只限于unlock()。下面是release()的源码:

    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
    
    
        if (tryRelease(arg)) {
    
    
            // 找到头结点
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 唤醒等待队列里的下一个线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    /**
     * Attempts to set the state to reflect a release in exclusive
     * mode.
     *
     * <p>This method is always invoked by the thread performing release.
     *
     * <p>The default implementation throws
     * {@link UnsupportedOperationException}.
     *
     * @param arg the release argument. This value is always the one
     *        passed to a release method, or the current state value upon
     *        entry to a condition wait.  The value is otherwise
     *        uninterpreted and can represent anything you like.
     * @return {@code true} if this object is now in a fully released
     *         state, so that any waiting threads may attempt to acquire;
     *         and {@code false} otherwise.
     * @throws IllegalMonitorStateException if releasing would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if exclusive mode is not supported
     */
    protected boolean tryRelease(int arg) {
    
    
        throw new UnsupportedOperationException();
    }

    /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
	// 此方法用于唤醒等待队列中下一个线程。
    private void unparkSuccessor(Node node) {
    
    
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        // 获取结点的状态,node一般为当前线程所在的结点。
        int ws = node.waitStatus;
        // 置零当前线程所在的结点状态,允许失败。
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        // 找到下一个需要唤醒的结点s
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
    
     // 如果为空或已取消
            s = null;  // 清空结点
            for (Node t = tail; t != null && t != node; t = t.prev) // 从后向前找。
                if (t.waitStatus <= 0)  // 从这里可以看出,<=0的结点,都是还有效的结点。
                    s = t;  // 找到最接近 node 的结点
        }
        if (s != null)
            LockSupport.unpark(s.thread); // 唤醒找到的结点的线程
    }

正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state -= arg),也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state = 0),要返回true,否则返回false。

unparkSuccessor(Node)方法用于唤醒等待队列中下一个线程。这里要注意的是,下一个线程并不一定是当前节点的next节点,而是下一个可以用来唤醒的线程,如果这个节点存在,调用unpark()方法唤醒。

总之,release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state = 0),它会唤醒等待队列里的其他线程来获取资源。

4、acquireShared(int) 方法

acquireShared(int)方法是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。下面是acquireShared()的源码:

    /**
     * Acquires in shared mode, ignoring interrupts.  Implemented by
     * first invoking at least once {@link #tryAcquireShared},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquireShared} until success.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquireShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     */
    public final void acquireShared(int arg) {
    
    
        // 尝试获取资源,负值:获取失败  0:获取成功,没有剩余资源  大于0:获取成功,有剩余资源,其他线程可以继续获取
        if (tryAcquireShared(arg) < 0)
            // 获取资源失败进入等待队列
            doAcquireShared(arg);
    }

    /**
     * Attempts to acquire in shared mode. This method should query if
     * the state of the object permits it to be acquired in the shared
     * mode, and if so to acquire it.
     *
     * <p>This method is always invoked by the thread performing
     * acquire.  If this method reports failure, the acquire method
     * may queue the thread, if it is not already queued, until it is
     * signalled by a release from some other thread.
     *
     * <p>The default implementation throws {@link
     * UnsupportedOperationException}.
     *
     * @param arg the acquire argument. This value is always the one
     *        passed to an acquire method, or is the value saved on entry
     *        to a condition wait.  The value is otherwise uninterpreted
     *        and can represent anything you like.
     * @return a negative value on failure; zero if acquisition in shared
     *         mode succeeded but no subsequent shared-mode acquire can
     *         succeed; and a positive value if acquisition in shared
     *         mode succeeded and subsequent shared-mode acquires might
     *         also succeed, in which case a subsequent waiting thread
     *         must check availability. (Support for three different
     *         return values enables this method to be used in contexts
     *         where acquires only sometimes act exclusively.)  Upon
     *         success, this object has been acquired.
     * @throws IllegalMonitorStateException if acquiring would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if shared mode is not supported
     */
    protected int tryAcquireShared(int arg) {
    
    
        throw new UnsupportedOperationException();
    }

这里tryAcquireShared()依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。所以这里acquireShared()的流程就是:

  1. tryAcquireShared()尝试获取资源,成功则直接返回;
  2. 失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。

4.1 doAcquireShared(int) 方法

将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。源码如下:

    /**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
    
    
        // 加入队列尾部
        final Node node = addWaiter(Node.SHARED);
        // 是否成功标志
        boolean failed = true;
        try {
    
    
            // 等待过程中是否被中断过的标志
            boolean interrupted = false;
            for (;;) {
    
    
                final Node p = node.predecessor(); // 获取当前结点的前驱
                // 如果当前结点是 head 的下一个结点,因为 head 是拿到资源的线程,此时 node 被唤醒,很可能是 head 用完资源来唤醒自己的
                if (p == head) {
    
    
                    int r = tryAcquireShared(arg); // 尝试获取资源
                    if (r >= 0) {
    
     // 成功
                        // 将当前结点设置为 head 结点,还有剩余资源可以再唤醒之后的线程
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        // 如果等待过程中被打断过,此时将中断补上。
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
                    interrupted = true;
            }
        } finally {
    
    
            // 获取资源失败,将结点移出等待队列
            if (failed)
                cancelAcquire(node);
        }
    }

跟独占模式比,还有一点需要注意的是,这里只有线程是head.next时(“老二”),才会去尝试获取资源,有剩余的话还会唤醒之后的队友。那么问题就来了,假如老大用完后释放了5个资源,而老二需要6个,老三需要1个,老四需要2个。老大先唤醒老二,老二一看资源不够,他是把资源让给老三呢,还是不让?答案是否定的!老二会继续park()等待其他线程释放资源,也更不会去唤醒老三和老四了。独占模式,同一时刻只有一个线程去执行,这样做未尝不可;但共享模式下,多个线程是可以同时执行的,现在因为老二的资源需求量大,而把后面量小的老三和老四也都卡住了。当然,这并不是问题,只是AQS保证严格按照入队顺序唤醒罢了(保证公平,但降低了并发)。实现如下:

    /**
     * Sets head of queue, and checks if successor may be waiting
     * in shared mode, if so propagating if either propagate > 0 or
     * PROPAGATE status was set.
     *
     * @param node the node
     * @param propagate the return value from a tryAcquireShared
     */
    private void setHeadAndPropagate(Node node, int propagate) {
    
    
        Node h = head; // Record old head for check below
        setHead(node); // head指向自己
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        // 如果还有剩余量,继续唤醒下一个邻居线程
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
    
    
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

此方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式!至此,acquireShared()也要告一段落了。让我们再梳理一下它的流程:

  1. tryAcquireShared()尝试获取资源,成功则直接返回;
  2. 失败则通过doAcquireShared()进入等待队列park(),直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。

5、releaseShared(int)方法

releaseShared(int)方法是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。下面是releaseShared()的源码:

    /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
    
    
        // 尝试释放资源
        if (tryReleaseShared(arg)) {
    
    
            // 唤醒后继结点
            doReleaseShared();
            return true;
        }
        return false;
    }

此方法的流程也比较简单,一句话:释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。

5.1 doReleaseShared()

    /**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
    
    
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
    
    
            Node h = head;
            // 头结点不为空,并且不是尾结点,说明还有其他结点在等待队列中
            if (h != null && h != tail) {
    
    
                // 获取头结点的等待状态
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
    
    
                    // 将当前头结点的状态设置为 0 不成功则继续循环
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 重置头结点的等待状态为 0 成功后,唤醒后继
                    unparkSuccessor(h);  
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head) // head发生变化              // loop if head changed
                break;
        }
    }

6、简单应用

6.1 Mutex(互斥锁)

Mutex是一个不可重入的互斥锁实现。锁资源(AQS里的state)只有两种状态:0表示未锁定,1表示锁定。下边是Mutex的核心源码:【下面的代码是 AbstractQueuedSynchronizer 类的顶部注释中的例子】

class Mutex implements Lock, java.io.Serializable {
    
    
    // 自定义同步器
    private static class Sync extends AbstractQueuedSynchronizer {
    
    
        // 判断是否锁定状态
        protected boolean isHeldExclusively() {
    
    
            return getState() == 1;
        }

        // 尝试获取资源,立即返回。成功则返回true,否则false。
        public boolean tryAcquire(int acquires) {
    
    
            assert acquires == 1; // 这里限定只能为1个量
            if (compareAndSetState(0, 1)) {
    
     // state 为 0 才设置为 1,不可重入!
                setExclusiveOwnerThread(Thread.currentThread()); // 设置为当前线程独占资源
                return true;
            }
            return false;
        }

        // 尝试释放资源,立即返回。成功则为true,否则false。
        protected boolean tryRelease(int releases) {
    
    
            assert releases == 1; // 限定为1个量
            if (getState() == 0) // 既然来释放,那肯定就是已占有状态了。只是为了保险,多层判断!
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null); 
            setState(0);//释放资源,放弃占有状态
            return true;
        }
    }

    // 真正同步类的实现都依赖继承于AQS的自定义同步器!
    private final Sync sync = new Sync();

    // lock <--> acquire。两者语义一样:获取资源,即便等待,直到成功才返回。
    public void lock() {
    
    
        sync.acquire(1);
    }

    // tryLock <--> tryAcquire。两者语义一样:尝试获取资源,要求立即返回。成功则为true,失败则为false。
    public boolean tryLock() {
    
    
        return sync.tryAcquire(1);
    }

    // unlock <--> release。两者语文一样:释放资源。
    public void unlock() {
    
    
        sync.release(1);
    }

    // 锁是否占有状态
    public boolean isLocked() {
    
    
        return sync.isHeldExclusively();
    }
}

同步类在实现时一般都将自定义同步器(Sync)定义为内部类,供自己使用(如下图);而同步类自己(Mutex)则实现某个接口,对外服务。当然,接口的实现要直接依赖Sync,它们在语义上也存在某种对应关系!!而Sync只用实现资源state的获取-释放方式tryAcquire-tryRelelase,至于线程的排队、等待、唤醒等,上层的AQS都已经实现好了,我们不用关心。
在这里插入图片描述
除了Mutex,ReentrantLock/CountDownLatch/Semphore这些同步类的实现方式都差不多,不同的地方就在获取-释放资源的方式tryAcquire-tryRelelase。

到此,AQS的源码已经解读完了,写的不是很详细,可能解读的不是很清楚,不过是博主在看源码时候的解读,不一定是最正确的,希望谅解,同时也谢谢以下的参考文章。

参考文章:

  1. https://www.cnblogs.com/waterystone/p/4920797.html
  2. https://www.cnblogs.com/fsmly/p/11274572.html
  3. http://gee.cs.oswego.edu/dl/papers/aqs.pdf - Doug Lea 大神的论文

评论