ReentrantLock实现原理

因为ReentrantLock和ReentrantReadWriteLock的实现原理基本相同,就单看ReentrantLock。


第一步先看加锁
final void lock() {

if (compareAndSetState(0, 1)) // 第一次尝试CAS指令来获取锁,若是失败的话,再通过acquire(1)方法获取锁。

setExclusiveOwnerThread(Thread.currentThread());

else

acquire(1);

}
acquire(1)方法的实现是一个非公平的偏向锁,

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 这里若是获取锁失败的话,那么加入等待队列,是一个CLH的实现!

selfInterrupt();

}
tryAcquire 就是JAVA的偏向锁实现,,先看看最tryAcquire的实现:

final boolean nonfairTryAcquire(int acquires) {

final Thread current = Thread.currentThread();

int c = getState();

if (c == 0) { // c==0 代表当前没有没有线程竞争锁,因为是可重入锁,所以通过CPU的指令对acquires+1 在释放锁的时候会对acquires-1

if (compareAndSetState(0, acquires)) { // 这里并没有指定是队列中的第一个元素,是所有可竞争的线程,所以才是它不公平的地方,排队不一定有用!

setExclusiveOwnerThread(current); //标示当前线程为获取锁的线程,并返回true

return true;

}

}

else if (current == getExclusiveOwnerThread()) { // 这里是偏向锁实现的关键,重入后还是自己持有锁,那么不去执行CAS操作获取锁等操作导致的时间延迟,

int nextc = c + acquires;

if (nextc < 0) // overflow

throw new Error("Maximum lock count exceeded");

setState(nextc);

return true;

}

return false;

}
看看addWaiter(Node.EXCLUSIVE), arg) 是如何实现的吧

private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode);

// Try the fast path of enq; backup to full enq on failure

Node pred = tail;

if (pred != null) { // 若是尾节点不为空的话,那么设置新进入的线程上一个节点是尾节点,同时通过CAS将当前线程设置为尾节点!

node.prev = pred;

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

enq(node); // 若是尾节点为空,具体看看enq方法

return node;

}

&nbsp;
private Node enq(final Node node) {

for (;;) {

Node t = tail;

if (t == null) { // Must initialize // 若是尾节点为空的话,那么新建一个假的节点并设置为首节点

Node h = new Node(); // Dummy header

h.next = node;

node.prev = h; // 将首节点设置为当前节点的前节点

if (compareAndSetHead(h)) {

tail = node;

return h;

}

}

else { // 若是不为空,直接设置当前节点为尾节点

node.prev = t;

if (compareAndSetTail(t, node)) {

t.next = node;

return t;

}

}

}

}
总的说 addWaiter方法就是将线程加入到队列的尾部。

在回过头来看看  acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 中 acquireQueued方法做的工作:
final boolean acquireQueued(final Node node, int arg) {
try {

boolean interrupted = false;

for (;;) {

final Node p = node.predecessor();

if (p == head && tryAcquire(arg)) { // 若是上一个节点是头结点的话,那么尝试当前线程获取锁,因为你后面没有线程了(至于为什么后面没有线程在后面会解释)

setHead(node); // 执行了当前线程后,并将当前线程设置为头线程,置后节点为空。

p.next = null; // help GC

return interrupted;

}

if (shouldParkAfterFailedAcquire(p, node) && // 若是自己的前节点不是头节点,或者没有竞争到锁的话,那么park当前线程

parkAndCheckInterrupt())

interrupted = true;

}

} catch (RuntimeException ex) {

cancelAcquire(node);

throw ex;

}

}
先看 shouldParkAfterFailedAcquire干了什么

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

int ws = pred.waitStatus; //在第一次进入的时候头节点是空对象,所以它的waitStatus就是默认值0

if (ws == Node.SIGNAL)

/*

* This node has already set status asking a release

* to signal it, so it can safely park

*/

return true;

if (ws > 0) {

/*

* Predecessor was cancelled. Skip over predecessors and

* indicate retry.

*/

do {

node.prev = pred = pred.prev;

} while (pred.waitStatus > 0); // 清理那些一直未拿到锁,并最终抛出异常的线程!

pred.next = node;

} else {

/*

* waitStatus must be 0 or PROPAGATE. Indicate that we

* need a signal, but don't park yet. Caller will need to

* retry to make sure it cannot acquire before parking.

*/

compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 这个时候就会尝试将头结点的waitStatus设置为SIGNAL状态,就是-1,

}

return false;

}
在第一次进入的时候头节点是空对象,所以它的waitStatus就是默认值0,在结果返回false后,在外层的 acquireQueued方法中是一个for(;;),下次再次进入的话 ,因为前节点已经是的waitStatus已经变成了SIGNAL,所以会返回true,那么就会执行
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) 中的 parkAndCheckInterrupt,这个方法具体如下所示:
private final boolean parkAndCheckInterrupt() {

LockSupport.park(this); // 简单的park当前没有抢到锁的线程!

return Thread.interrupted();

}
此刻就会进入:
final boolean acquireQueued(final Node node, int arg) {
try {

boolean interrupted = false;

for (;;) {

final Node p = node.predecessor();

if (p == head && tryAcquire(arg)) {

setHead(node);

p.next = null; // help GC

return interrupted;

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} catch (RuntimeException ex) {

cancelAcquire(node); // 在前面park自己的线程后

throw ex;

}

}
因为总有一个线程会抢到锁,所以再来看看unlock会干什么事情。
public final boolean release(int arg) {
if (tryRelease(arg)) { //因为是可重入锁,那么就先减少1

Node h = head;

if (h != null && h.waitStatus != 0)

unparkSuccessor(h); 进行队列中线程的unpark,从头部节点开始,从这里看到,

return true;

}

return false;

}

&nbsp;

p
private void unparkSuccessor(Node node) {

/*

* If status is negative (i.e., possibly needing signal) try

* to clear in anticipation of signalling. It is OK if this

* fails or if status is changed by waiting thread.

*/

int ws = node.waitStatus;

if (ws < 0)

compareAndSetWaitStatus(node, ws, 0); // 将头部节点置为 0 ,因为在 final boolean acquireQueued(final Node node, int arg) {函数中,我们是将一个拿到锁的线程置为了头部节点,所以需要将那个原本执行的线程重置为一个初始状态!

&nbsp;

/*

* Thread to unpark is held in successor, which is normally

* just the next node. But if cancelled or apparently null,

* traverse backwards from tail to find the actual

* non-cancelled successor.

*/

Node s = node.next;

if (s == null || s.waitStatus > 0) {

s = null;

for (Node t = tail; t != null && t != node; t = t.prev) // 这里就是最关键的地方了,找到当前执行线程的下一个节点,参数传入的是头节点,那么就是找到头节点后一个处在可运行状态的节点,并进行unpark,到这里我们就可以清晰的看出整体中是一个不完全的先进先出队列,不完全是因为就算unpark了你,当你还需要去跟其他未进入队列的线程竞争,若是竞争失败的话,还的乖乖的回到原点,继续等待!

if (t.waitStatus <= 0)

s = t;

}

if (s != null)

LockSupport.unpark(s.thread);

}

作者: inter12

在这苦短的人生中,追求点自己的简单快乐

发表评论

电子邮件地址不会被公开。 必填项已用*标注