流失 發表於 2026-2-5 13:05:00

基于AQS实现的ReentrantLock

<h1 id="基于aqs实现的reentrantlock">基于AQS实现的ReentrantLock</h1>
<p>这里的源码用的Java8版本</p>
<h2 id="lock方法">lock方法</h2>
<p>当ReentrantLock类的实例对象尝试获取锁的时候,调用lock方法,</p>
<p><img src="https://img2024.cnblogs.com/blog/3592839/202602/3592839-20260205130512179-38125355.png" alt="image" loading="lazy"></p>
<p>会进入sync的lock方法,其中Sync是ReentrantLock的一个内部类,ReentrantLock构造方法会默认使用非公平锁<code>NonfairSync</code>,这个类是继承于Sync的</p>
<pre><code class="language-java">      final void lock() {
            if (!initialTryLock())
                acquire(1);
      }
// 其中Sync的initialTryLock是抽象方法,需要看非公平锁实现方法
</code></pre>
<blockquote>
<p>[!TIP]<br>
在这里是第一次尝试获取锁</p>
</blockquote>
<p>由于ReentrantLock是个可重入锁,判断里有重入的判断</p>
<pre><code class="language-java">final boolean initialTryLock() {
            Thread current = Thread.currentThread();
                        // 获取当前线程的对象
            if (compareAndSetState(0, 1)) { // first attempt is unguarded
                        // 用CAS比较state状态是否为0(无人持有锁),如果是,就转为1(获取到锁)
                setExclusiveOwnerThread(current);
                        // 将当前进程设置为拥有锁的线程
                return true;
            } else if (getExclusiveOwnerThread() == current) {
                        // 当前线程为拥有锁的线程(重复获取),重入
                int c = getState() + 1;
                if (c &lt; 0) // overflow
                        // 负数,state是个int类型数据,超出可能导致溢出变为负数
                  throw new Error("Maximum lock count exceeded");
                setState(c);
                        // 设置新的state
                return true;
            } else
                        // 已有线程占锁,返回为false
                return false;
      }
</code></pre>
<p>然后开始调用acquire方法,传入1</p>
<pre><code class="language-java">    public final void acquire(int arg) {
      if (!tryAcquire(arg) &amp;&amp;
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
</code></pre>
<p>调用tryAcquire()方法,其中tryAcquire()方法是一个只有抛出异常的方法,需要重写,我们看非公平锁的写法</p>
<p>‍</p>
<blockquote>
<p>[!TIP]<br>
这是第二次获取锁</p>
</blockquote>
<pre><code class="language-java">      protected final boolean tryAcquire(int acquires) {
            if (getState() == 0 &amp;&amp; !hasQueuedPredecessors() &amp;&amp;
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
      }
</code></pre>
<p>这里,如果state是0,即没有线程占用锁的情况下<code>getState() == 0</code>​这个为真<code>!hasQueuedPredecessors()</code>执行这个方法,这个方法会检查是否已经出现了等待队列</p>
<pre><code class="language-java">    public final boolean hasQueuedPredecessors() {
      Thread first = null; Node h, s;
      if ((h = head) != null &amp;&amp; ((s = h.next) == null ||
                                 (first = s.waiter) == null ||
                                 s.prev == null))
            first = getFirstQueuedThread(); // retry via getFirstQueuedThread
      return first != null &amp;&amp; first != Thread.currentThread();
    }
</code></pre>
<p>当未出现 同步队列/阻塞队列 ,或者当前线程是队列的第一个时,执行compareAndSetState(0, acquires),第二次尝试获取锁,如果成功,返回真</p>
<p>否则返回假,执行<code>acquireQueued(addWaiter(Node.EXCLUSIVE), arg))</code></p>
<pre><code class="language-java">    private Node addWaiter(Node mode) {
      Node node = new Node(Thread.currentThread(), mode);
      // Try the fast path of enq; backup to full enq on failure
      Node pred = tail;
      if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                        // 尝试加入队尾
                pred.next = node;
                return node;
            }
      }
      enq(node);
      return node;
    }
</code></pre>
<p>Node是双向队列:阻塞队列一个节点,是为了保证原子化所以包装起来的</p>
<p>如果tail尾指针指向的节点不为空,则设置新生成的为尾指针指向的</p>
<p>否则(阻塞队列为空),调用enq函数</p>
<pre><code class="language-java">    private Node enq(final Node node) {
      for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                        // 使用CAS,防止多线程同时创建头节点,所以本质上还是需要抢入队顺序
                  tail = head;
                        // 初始化头节点,并将尾指针指向头节点
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                        // 判断t是否为尾节点,如果有线程更快的改掉尾节点,那么修改失败,
                        // 重新进入for循环
                  t.next = node;
                  return t;
                        // 修改成功
                }
            }
      }
    }
</code></pre>
<blockquote>
<p>[!TIP]<br>
这是第三次尝试获取锁</p>
</blockquote>
<pre><code class="language-java">    final boolean acquireQueued(final Node node, int arg) {
      boolean failed = true;
      try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                        // 获取node的前一个节点,如果前一个节点是头节点(当前节点是第一个)
                        // 执行tryAcquire(arg),执行第三次尝试获取锁
                if (p == head &amp;&amp; tryAcquire(arg)) {
                        // 获取锁成功,出队
                  setHead(node);// 将node设为头节点
                  p.next = null; // help GC
                  failed = false;
                  return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &amp;&amp;
                  parkAndCheckInterrupt())
                  interrupted = true;
            }
      } finally {
            if (failed)
                cancelAcquire(node);
      }
    }
</code></pre>
<p>如果第三次尝试获取锁失败了,会调用shouldParkAfterFailedAcquire()方法,将node的前一个节点传入(node一直都是加入的节点)</p>
<pre><code class="language-java">    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
      int ws = pred.waitStatus;
      if (ws == Node.SIGNAL)
                // 确认前面的节点处于SIGNAL状态,即确认前面的节点会叫醒自己
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
      if (ws &gt; 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus &gt; 0);
                        // Node里面仅有一个大于零的状态,即1取消状态,也就是说当前任务被取消了
                        // 持续循环值找到不再取消的节点
            pred.next = node;
      } else {
                // 将前一个节点用CAS转为Node.SIGNAL状态-1,返回为false
            /*
             * waitStatus must be 0 or PROPAGATE.Indicate that we
             * need a signal, but don't park yet.Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
      }
      return false;
    }
</code></pre>
<blockquote>
<p>这里插一嘴,Node节点有一些状态,来体现其的任务状态,如前面传入的就是独占队列,<code>addWaiter(Node.EXCLUSIVE)</code></p>
<pre><code class="language-java">    static final class Node {
      /** Marker to indicate a node is waiting in shared mode */
      static final Node SHARED = new Node();
                // 共享队列
      /** Marker to indicate a node is waiting in exclusive mode */
      static final Node EXCLUSIVE = null;
                // 独占队列
      /** waitStatus value to indicate thread has cancelled */// 取消
      static final int CANCELLED =1;
                // 已被取消
      /** waitStatus value to indicate successor's thread needs unparking */
      static final int SIGNAL    = -1;
                // 表示next节点已经park,需要被唤醒
      /** waitStatus value to indicate thread is waiting on condition */
      static final int CONDITION = -2;
      /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
                // 共享状态
      static final int PROPAGATE = -3;
</code></pre>
</blockquote>
<pre><code class="language-java">if (shouldParkAfterFailedAcquire(p, node) &amp;&amp;
                  parkAndCheckInterrupt())
                  interrupted = true;
</code></pre>
<p>如果前一个节点的waitState是0,会被CAS转为-1,然后返回false,进而不会执行parkAndCheckInterrupt(),继续for的无限循环,这里有可能出现第四次尝试</p>
<p>如果前一个节点的waitState是-1,该函数返回一个true,也就可以继续执行<code>parkAndCheckInterrupt()</code></p>
<pre><code class="language-java">    private final boolean parkAndCheckInterrupt() {
      LockSupport.park(this);
      return Thread.interrupted();
    }
</code></pre>
<p>当前线程进入park状态</p>
<p>‍</p>
<p>至此我们完成了这个的lock过程</p>
<p>‍</p>
<h2 id="unlock方法">unlock方法</h2>
<p>unlock()也是公平锁以及非公平锁都有的方法,同样继承了Sync</p>
<pre><code class="language-java">    public void unlock() {
      sync.release(1);
    }
</code></pre>
<p>Sync的release方法</p>
<pre><code class="language-java">    public final boolean release(int arg) {
      if (tryRelease(arg)) {
            Node h = head;
            if (h != null &amp;&amp; h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
      }
      return false;
    }
</code></pre>
<p>首先尝试tryRelease方法</p>
<pre><code class="language-java">      protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
      }
</code></pre>
<p>如果成功醒过来,该线程依然处于一种park的位置上,即parkAndCheckInterrupt这个方法上,这个方法返回是否被中断<code>ReentrantLock</code>这个锁仅获取中断信息,而不会做出任何操作</p>
<pre><code class="language-java">final boolean acquireQueued(final Node node, int arg) {
      boolean failed = true;
      try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head &amp;&amp; tryAcquire(arg)) {
                  setHead(node);
                  p.next = null; // help GC
                  failed = false;
                  return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &amp;&amp;
                  parkAndCheckInterrupt())
                  interrupted = true;
            }
      } finally {
            if (failed)
                cancelAcquire(node);
      }
    }
</code></pre>
<p>苏醒过来之后,继续for循环,尝试获取锁,失败之后会接着park,成功就会获取锁,并返回中断状态,在acquire中决定自我中断</p>
<pre><code class="language-java">      final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                  setExclusiveOwnerThread(current);
                  return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc &lt; 0) // overflow
                  throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
      }
</code></pre>
<p>并将setExclusiveOwnerThread传入当前线程,返回为真,因此在TryRelease方法里的<code>Thread.currentThread() != getExclusiveOwnerThread()</code>一定为假,不会抛出异常,并设置free为false,当c也就是资源的state如果是0</p>
<pre><code class="language-java">                        if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
</code></pre>
<p>c如果是0,即没有线程占用资源,setExclusiveOwnerThread将锁的线程设置为空,如果不为0,也就是重入锁仅仅解锁一次,c依然存在多个,设置c为新的state值,然会free值(资源锁的使用情况)</p>
<pre><code class="language-java">    public final boolean release(int arg) {
      if (tryRelease(arg)) {
            Node h = head;
            if (h != null &amp;&amp; h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
      }
      return false;
    }
</code></pre>
<pre><code class="language-java">    private void unparkSuccessor(Node node) {
      /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.It is OK if this
         * fails or if status is changed by waiting thread.
         */
      int ws = node.waitStatus;
      if (ws &lt; 0)
            compareAndSetWaitStatus(node, ws, 0);

      /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
      Node s = node.next;、
                // 如果下一个节点的状态为取消或者为空,从后向前找最后一个满足条件的,赋值为s
      if (s == null || s.waitStatus &gt; 0) {
            s = null;
            for (Node t = tail; t != null &amp;&amp; t != node; t = t.prev)
                if (t.waitStatus &lt;= 0)
                  s = t;
      }
                // s不为空的话作为下一个被唤醒的节点,尝试唤醒
      if (s != null)
            LockSupport.unpark(s.thread);
    }
</code></pre>
<p>此时,当前节点为头节点,调用unparkSuccessor()方法,获取头节点的下一个节点</p><br><br>
来源:https://www.cnblogs.com/huiduthinker/p/19578752/reentrantlock-implemented-based-on-aqs-z1wrekj
頁: [1]
查看完整版本: 基于AQS实现的ReentrantLock