在上面一篇分析ThreadExecutedPool的文章中我们看到线程池实现源码中大量使用了ReentrantLock锁,那么ReentrantLock锁的优势是什么?它又是怎么实现的呢?
ReentrantLock又名可重入锁,为什么称之为可重入锁呢?简单来说因为它允许一个线程多次取获得该锁,不过多次获取该锁之后,也需要执行同样次数的释放锁操作,否则该锁将被当前线程一直持有,导致其它线程无法获取。需要注意的是,释放锁的操作需要我们用代码来控制,它并不会自动取释放锁。在ReentrantLock中实现了两种锁fairSync和NonfairSync,即公平锁和非公平锁,今天我们就来聊聊ReentrantLock中nonfairSync锁的实现。 废话不多说,下面开始分析代码!1、1 Lock()
首先看一下lock()方法,这个方法非常重要,它也是我们获取锁的入口:
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
ReentrantLock锁的初始状态为0,compareAndSetState方法将尝试获取锁并将当前锁的状态设置为1。如果成功获取了锁会调用setExclusiveOwnerThread()方法设置当前线程拥有该锁的独占访问权。
如果调用compareAndSetState()获取锁失败,则返回false并执行acquire(1)。public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
我们看到acquire(1)的代码中有一条if语句,当tryAcquire(1)返回false以及acquireQueued(addWaiter(Node.EXCLUSIVE), arg)返回true时,才会去执行selfInterrupt();方法。下面我们来看看tryAcquire(1)和acquireQueued(addWaiter(Node.EXCLUSIVE), arg)这两个家伙干了什么。
先看一下tryAcquire()方法。
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
tryAcquire方法会去调用nonfairTryAcquire()方法。
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
1、首先调用getState()方法获取当前锁状态,如果锁状态为0。表示当前锁没有被其他线程占用,这里会再次尝试去获取锁。如果成功的拿到了锁,将设置锁的拥有者为当前线程,同时返回true。如果此时返回true的话,表示当前线程成功获取到了锁,lock()方法调用成功。
2、如果当前锁状态不为0,判断当前线程是否为锁的拥有者,如果是的话,尝试将当前锁的状态值加acquires。如果当前neextc的值小于0,抛出异常。若不小于0,将当前锁的值设置为nextc。为什么说ReentrantLock为可重入锁,就体现在这里了,如果当前线程为锁的拥有者,该线程再次调用lock方法时,当前锁的状态值会加1。当然我们释放该锁的时候,也要调用相应的unlock()方法,以使得锁的state值为0,可被其他线程请求。 3、如果当前锁的值不为0且拥有锁的线程也不为当前线程则返回false。也就是tryAcquire()再次获取锁并没有成功。 值得注意的是,既然再第一次调用compareAndSetState()的时候,已经获取失败了为什么还要再调用tryAcquire()方法再获取一次呢?我们可以理解为这是一种保险机制,如果此时无法获取锁,我们将会将当前线程加入到阻塞队列中挂起等待后面被唤醒重新争夺锁。回顾一下上面的if判断条件
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
如果tryAcquire()的返回值为false,那么接下来会执行acquireQueued(addWaiter(Node.EXCLUSIVE),arg)方法。这个方法看起来比较复杂,它在acquireQueued()方法又调用了addWaiter()方法,我们先来看看addWaiter()方法:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
首先我们创建了一个包含了当前线程的Node节点,并将tail(tail节点即是尾节点)赋值给pred节点。如果我们第一次进来,那么tail节点肯定为空,将会去执行enq(node)方法。如果tail不为空,那么接下来的三句代码干了什么呢?
先回忆一下,如果我们希望在一个双向链表的尾部新增一个节点,应该如何操作,大致应该有如下三步:- node.prev = pred; node节点的前驱指向尾节点
- pred.next = node; 将尾节点的后继设置为当前节点
- tail = node; 将node节点设置为尾节点 我们再看一下详细代码:
- node.prev=pred; 当前pred节点代表的是尾节点,也就是说设置node节点的前驱为当前尾节点。
- if (compareAndSetTail(pred, node)),我们看下compareAndSetTail(pred, node)方法。
private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }
compareAndSetTail的原理其实就是CAS算法,将期望值和内存地址为tailOffset上的值进行比较,如果两者相同,则更新tailOffset上的值为最新值update。其实也就是如果tailOffset上的值和pred(老的尾节点)的值相同,则将尾节点更新为新的node节点。
- 将原尾节点的后继设置为当前节点。 其实上面三步实现的功能和在双向链表尾部新增一个节点的功能大致相同,只是顺序略有调整。
接下来看一下enq(node)方法
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
enq代码中有一个死循环for(;;),在这个循环中会执行以下操作:
- 将tail节点赋给t节点,t节点当前即为尾节点。
- 如果为t节点为空,将当前节点设置为头节点,并将头节点赋值给尾节点。相当于头尾节点都指向了新建节点。
- 如果尾节点不为空,将当前节点node的前驱指向尾节点,将node节点设置为新的tail节点。同时将原尾节点的后继设置为当前节点,相当于将当前node节点链接到原尾节点之后,插入到链表中。
看到现在,我们大致明白了addWaiter()方法其实就是将当前节点添加到链表尾部的一个方法。
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
再跳回到这句代码,现在已经将线程添加到队列中了。那么acquireQueued方法到底又是干什么的呢?我们接着分析。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
在acquireQueued方法中有一个无限循环,这个循环干了什么,它的用处是什么呢?我们接着看
final Node p = node.predecessor();
获取当前node节点的前驱节点,并赋值给p
if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; }
- 如果p节点为头结点,同时当前线程可以获取到锁,则调用setHead(node)方法。setHead方法其实即是将当前获取到了锁的节点设置为头结点。如果当前节点已经获取到了锁,那么该节点也无需再保存当前线程了。此时当前线程已经获取到了锁,将p节点的后继节点设置为null,以方便jvm自动回收。最后跳出当前循环。
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
- 如果当前节点的前驱不为头节点,或当前节点无法获取到锁。执行shouldParkAfterFailedAcquire()判断当前线程是否需要被挂起。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
上面的代码可以总结为一下三句话:
- 如果前驱节点的waitStatus等于Node.SIGNAL,表示前驱节点已经准备好去唤醒后续节点。因此我们可以安全的将当前线程挂起以待被唤醒获得锁。返回true,当前线程在后续代码中被挂机。
- 如果前驱节点的waiteStatus状态大于0(只有CANCEL状态值才会大于0),从当前节点一直往前找,直到找到一个waitStatus状态小于0的节点。将找到节点的后继设置为当前节点。
- 如果前驱节点的状态即不为SIGNAL也大于0,将前驱节点的状态设置为SIGNAL。
到此为止,我们已经知道了尝试去获取锁的线程是如何被放入到阻塞队列中并挂起的。接下来我们来看看获取到锁的线程是如何释放锁的。
1、2 unlock()
上面我们已经较为清晰的理了一遍ReentrantLock获取锁的思路,下面我们开始分析一下如何释放获取到的锁。
在ReentrantLock里面有如下代码,我们跟踪一下release()方法干了什么。public void unlock() { sync.release(1); }
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
在release()方法中,首先尝试去执行tryRelease()方法。看到这个名字我们就知道它的用处是尝试去释放当前获取到的锁。计入tryRelease()方法看一下它到底干了什么。
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
- 获取当前锁的状态值并减去传递过来的releases变量,不难发现。如果我们在获取锁的过程中会将当前锁的state值加1,因此我们在释放的时候也需要相对应的将state的值减去1。
- 如果当前线程不为锁的拥有者,抛出异常。我们的目的是释放当前线程拥有的锁,这句代码也很好理解。
- 创建一个初始值为false的变量free用来标识当前锁的操作是否被释放。
- c的值等于当前锁的状态值减去releasesd。由于ReentrantLock锁是可重入锁,因此锁的state值有可能是大于1的值。然而当state的值不为0的时候,我们可以任务当前线程仍然持有该锁,其它线程依然不能够去调用lock()方法去获取该锁。如果c的值等于0,我们任务当前线程已经释放了该锁,其它线程可以开始争夺它了。因为我们将free的值设置为true,同时将当前锁的拥有者设置为null。
- setState(c);吴磊当前锁的state值是否为0,我们都需要去更新state值。
- 返回free值。如果free的值为true,表示当前线程已不再拥有该锁,我们可以去唤醒后继线程来争夺该锁了。如果free的值为false,表示该锁仍然被当前线程所持有。
继续看上面的代码
Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true;
如果当前线程所持有的锁的state值为0,那么此时需要唤醒当前线程的后继节点去争夺该锁。我们看一下这段代码干了什么:
- 新建一个h节点,并将其赋值为head节点。
- 如果h节点为空,意味着当前头节点为空,一般情况头结点我们可以理解为即是当前拥有锁的线程,既然当前节点为空,那么也就没有办法释放锁了。同时如果头节点的waitStatus为0,正常拥有锁或者已经释放锁打算去唤醒其它线程的节点,不会为0状态。如果头节点不为空,且waitStatus也不为0,调用unparkSuccessor(h);方法去唤醒后继节点。
- 返回true
那么unparkSuccessor方法到底做了什么,它为什么可以唤醒后继节点呢?我们下面就来看看:
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
- 定义一个ws变量用于当前节点的waits状态。
- 如果waitStatus值小于0,将当前node节点的值设置为0;当然有人会问如果waitStatus的值大于0怎么办呢?如果waitSatus的值大于0,那么它只可能为CANCEL状态,也就是说当前节点线程已经被取消,自然也不用去唤醒别人了。
- 定义一个s节点拥有保存头节点的后继。
- 如果s==null,表示后继不存在,那么我们就要去尝试找找有没有其它线程等待被唤醒了;如果s.waitStatus小于0,小于0也就表示当前节点不为CANCLE状态,可以被唤醒。那么如果当前节点为空或者waitStatus值大于0时,如何去获取后续节点呢?下面的一大串代码其就是从当前队列的尾节点开始往前找,找到一个离当前节点最近的且waitStatus值小于0的节点等待唤醒的过程;
- 如果s节点不为null,调用LockSupport.unpark(s.thread);去唤醒该节点!