网站建设资讯

NEWS

网站建设资讯

死磕java同步系列之ReentrantLock源码解析(一)——公平锁、非公平锁

问题

(1)重入锁是什么?

成都创新互联公司专注于南芬网站建设服务及定制,我们拥有丰富的企业做网站经验。 热诚为您提供南芬营销型网站建设,南芬网站制作、南芬网页设计、南芬网站官网定制、微信小程序开发服务,打造南芬网络公司原创品牌,更为您提供南芬网站排名全网营销落地服务。

(2)ReentrantLock如何实现重入锁?

(3)ReentrantLock为什么默认是非公平模式?

(4)ReentrantLock除了可重入还有哪些特性?

简介

Reentrant = Re + entrant,Re是重复、又、再的意思,entrant是enter的名词或者形容词形式,翻译为进入者或者可进入的,所以Reentrant翻译为可重复进入的、可再次进入的,因此ReentrantLock翻译为重入锁或者再入锁。

重入锁,是指一个线程获取锁之后再尝试获取锁时会自动获取锁。

在Java中,除了ReentrantLock以外,synchronized也是重入锁。

那么,ReentrantLock的可重入性是怎么实现的呢?

继承体系

ReentrantLock实现了Lock接口,Lock接口里面定义了java中锁应该实现的几个方法:

// 获取锁
void lock();
// 获取锁(可中断)
void lockInterruptibly() throws InterruptedException;
// 尝试获取锁,如果没获取到锁,就返回false
boolean tryLock();
// 尝试获取锁,如果没获取到锁,就等待一段时间,这段时间内还没获取到锁就返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁
void unlock();
// 条件锁
Condition newCondition();

Lock接口中主要定义了 获取锁、尝试获取锁、释放锁、条件锁等几个方法。

源码分析

主要内部类

ReentrantLock中主要定义了三个内部类:Sync、NonfairSync、FairSync。

abstract static class Sync extends AbstractQueuedSynchronizer {}

static final class NonfairSync extends Sync {}

static final class FairSync extends Sync {}

(1)抽象类Sync实现了AQS的部分方法;

(2)NonfairSync实现了Sync,主要用于非公平锁的获取;

(3)FairSync实现了Sync,主要用于公平锁的获取。

在这里我们先不急着看每个类具体的代码,等下面学习具体的功能点的时候再把所有方法串起来。

主要属性

private final Sync sync;

主要属性就一个sync,它在构造方法中初始化,决定使用公平锁还是非公平锁的方式获取锁。

主要构造方法

// 默认构造方法
public ReentrantLock() {
    sync = new NonfairSync();
}
// 自己可选择使用公平锁还是非公平锁
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

(1)默认构造方法使用的是非公平锁;

(2)第二个构造方法可以自己决定使用公平锁还是非公平锁;

上面我们分析了ReentrantLock的主要结构,下面我们跟着几个主要方法来看源码。

lock()方法

彤哥贴心地在每个方法的注释都加上方法的来源。

公平锁

这里我们假设ReentrantLock的实例是通过以下方式获得的:

ReentrantLock reentrantLock = new ReentrantLock(true);

下面的是加锁的主要逻辑:

// ReentrantLock.lock()
public void lock() {
    // 调用的sync属性的lock()方法
    // 这里的sync是公平锁,所以是FairSync的实例
    sync.lock();
}
// ReentrantLock.FairSync.lock()
final void lock() {
    // 调用AQS的acquire()方法获取锁
    // 注意,这里传的值为1
    acquire(1);
}
// AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) {
    // 尝试获取锁
    // 如果失败了,就排队
    if (!tryAcquire(arg) &&
        // 注意addWaiter()这里传入的节点模式为独占模式
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
// ReentrantLock.FairSync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
    // 当前线程
    final Thread current = Thread.currentThread();
    // 查看当前状态变量的值
    int c = getState();
    // 如果状态变量的值为0,说明暂时还没有人占有锁
    if (c == 0) {
        // 如果没有其它线程在排队,那么当前线程尝试更新state的值为1
        // 如果成功了,则说明当前线程获取了锁
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            // 当前线程获取了锁,把自己设置到exclusiveOwnerThread变量中
            // exclusiveOwnerThread是AQS的父类AbstractOwnableSynchronizer中提供的变量
            setExclusiveOwnerThread(current);
            // 返回true说明成功获取了锁
            return true;
        }
    }
    // 如果当前线程本身就占有着锁,现在又尝试获取锁
    // 那么,直接让它获取锁并返回true
    else if (current == getExclusiveOwnerThread()) {
        // 状态变量state的值加1
        int nextc = c + acquires;
        // 如果溢出了,则报错
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        // 设置到state中
        // 这里不需要CAS更新state
        // 因为当前线程占有着锁,其它线程只会CAS把state从0更新成1,是不会成功的
        // 所以不存在竞争,自然不需要使用CAS来更新
        setState(nextc);
        // 当线程获取锁成功
        return true;
    }
    // 当前线程尝试获取锁失败
    return false;
}
// AbstractQueuedSynchronizer.addWaiter()
// 调用这个方法,说明上面尝试获取锁失败了
private Node addWaiter(Node mode) {
    // 新建一个节点
    Node node = new Node(Thread.currentThread(), mode);
    // 这里先尝试把新节点加到尾节点后面
    // 如果成功了就返回新节点
    // 如果没成功再调用enq()方法不断尝试
    Node pred = tail;
    // 如果尾节点不为空
    if (pred != null) {
        // 设置新节点的前置节点为现在的尾节点
        node.prev = pred;
        // CAS更新尾节点为新节点
        if (compareAndSetTail(pred, node)) {
            // 如果成功了,把旧尾节点的下一个节点指向新节点
            pred.next = node;
            // 并返回新节点
            return node;
        }
    }
    // 如果上面尝试入队新节点没成功,调用enq()处理
    enq(node);
    return node;
}
// AbstractQueuedSynchronizer.enq()
private Node enq(final Node node) {
    // 自旋,不断尝试
    for (;;) {
        Node t = tail;
        // 如果尾节点为空,说明还未初始化
        if (t == null) { // Must initialize
            // 初始化头节点和尾节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 如果尾节点不为空
            // 设置新节点的前一个节点为现在的尾节点
            node.prev = t;
            // CAS更新尾节点为新节点
            if (compareAndSetTail(t, node)) {
                // 成功了,则设置旧尾节点的下一个节点为新节点
                t.next = node;
                // 并返回旧尾节点
                return t;
            }
        }
    }
}
// AbstractQueuedSynchronizer.acquireQueued()
// 调用上面的addWaiter()方法使得新节点已经成功入队了
// 这个方法是尝试让当前节点来获取锁的
final boolean acquireQueued(final Node node, int arg) {
    // 失败标记
    boolean failed = true;
    try {
        // 中断标记
        boolean interrupted = false;
        // 自旋
        for (;;) {
            // 当前节点的前一个节点
            final Node p = node.predecessor();
            // 如果当前节点的前一个节点为head节点,则说明轮到自己获取锁了
            // 调用ReentrantLock.FairSync.tryAcquire()方法再次尝试获取锁
            if (p == head && tryAcquire(arg)) {
                // 尝试获取锁成功
                // 这里同时只会有一个线程在执行,所以不需要用CAS更新
                // 把当前节点设置为新的头节点
                setHead(node);
                // 并把上一个节点从链表中删除
                p.next = null; // help GC
                // 未失败
                failed = false;
                return interrupted;
            }
            // 是否需要阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 真正阻塞的方法
                parkAndCheckInterrupt())
                // 如果中断了
                interrupted = true;
        }
    } finally {
        // 如果失败了
        if (failed)
            // 取消获取锁
            cancelAcquire(node);
    }
}
// AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire()
// 这个方法是在上面的for()循环里面调用的
// 第一次调用会把前一个节点的等待状态设置为SIGNAL,并返回false
// 第二次调用才会返回true
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 上一个节点的等待状态
    // 注意Node的waitStatus字段我们在上面创建Node的时候并没有指定
    // 也就是说使用的是默认值0
    // 这里把各种等待状态再贴出来
    //static final int CANCELLED =  1;
    //static final int SIGNAL    = -1;
    //static final int CONDITION = -2;
    //static final int PROPAGATE = -3;
    int ws = pred.waitStatus;
    // 如果等待状态为SIGNAL(等待唤醒),直接返回true
    if (ws == Node.SIGNAL)
        return true;
    // 如果前一个节点的状态大于0,也就是已取消状态
    if (ws > 0) {
        // 把前面所有取消状态的节点都从链表中删除
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 如果前一个节点的状态小于等于0,则把其状态设置为等待唤醒
        // 这里可以简单地理解为把初始状态0设置为SIGNAL
        // CONDITION是条件锁的时候使用的
        // PROPAGATE是共享锁使用的
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
// AbstractQueuedSynchronizer.parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
    // 阻塞当前线程
    // 底层调用的是Unsafe的park()方法
    LockSupport.park(this);
    // 返回是否已中断
    return Thread.interrupted();
}

看过之前彤哥写的【死磕 java同步系列之自己动手写一个锁Lock】的同学看今天这个加锁过程应该思路会比较清晰。

下面我们看一下主要方法的调用关系,可以跟着我的 → 层级在脑海中大概过一遍每个方法的主要代码:

ReentrantLock#lock()
->ReentrantLock.FairSync#lock() // 公平模式获取锁
  ->AbstractQueuedSynchronizer#acquire() // AQS的获取锁方法
    ->ReentrantLock.FairSync#tryAcquire() // 尝试获取锁
    ->AbstractQueuedSynchronizer#addWaiter()  // 添加到队列
      ->AbstractQueuedSynchronizer#enq()  // 入队
    ->AbstractQueuedSynchronizer#acquireQueued() // 里面有个for()循环,唤醒后再次尝试获取锁
      ->AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire() // 检查是否要阻塞
      ->AbstractQueuedSynchronizer#parkAndCheckInterrupt()  // 真正阻塞的地方

获取锁的主要过程大致如下:

(1)尝试获取锁,如果获取到了就直接返回了;

(2)尝试获取锁失败,再调用addWaiter()构建新节点并把新节点入队;

(3)然后调用acquireQueued()再次尝试获取锁,如果成功了,直接返回;

(4)如果再次失败,再调用shouldParkAfterFailedAcquire()将节点的等待状态置为等待唤醒(SIGNAL);

(5)调用parkAndCheckInterrupt()阻塞当前线程;

(6)如果被唤醒了,会继续在acquireQueued()的for()循环再次尝试获取锁,如果成功了就返回;

(7)如果不成功,再次阻塞,重复(3)(4)(5)直到成功获取到锁。

以上就是整个公平锁获取锁的过程,下面我们看看非公平锁是怎么获取锁的。

非公平锁
// ReentrantLock.lock()
public void lock() {
    sync.lock();
}
// ReentrantLock.NonfairSync.lock()
// 这个方法在公平锁模式下是直接调用的acquire(1);
final void lock() {
    // 直接尝试CAS更新状态变量
    if (compareAndSetState(0, 1))
        // 如果更新成功,说明获取到锁,把当前线程设为独占线程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
// ReentrantLock.NonfairSync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
    // 调用父类的方法
    return nonfairTryAcquire(acquires);
}
// ReentrantLock.Sync.nonfairTryAcquire()
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 如果状态变量的值为0,再次尝试CAS更新状态变量的值
        // 相对于公平锁模式少了!hasQueuedPredecessors()条件
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

相对于公平锁,非公平锁加锁的过程主要有两点不同:

(1)一开始就尝试CAS更新状态变量state的值,如果成功了就获取到锁了;

(2)在tryAcquire()的时候没有检查是否前面有排队的线程,直接上去获取锁才不管别人有没有排队呢;

总的来说,相对于公平锁,非公平锁在一开始就多了两次直接尝试获取锁的过程。

lockInterruptibly()方法

支持线程中断,它与lock()方法的主要区别在于lockInterruptibly()获取锁的时候如果线程中断了,会抛出一个异常,而lock()不会管线程是否中断都会一直尝试获取锁,获取锁之后把自己标记为已中断,继续执行自己的逻辑,后面也会正常释放锁。

题外话:

线程中断,只是在线程上打一个中断标志,并不会对运行中的线程有什么影响,具体需要根据这个中断标志干些什么,用户自己去决定。

比如,如果用户在调用lock()获取锁后,发现线程中断了,就直接返回了,而导致没有释放锁,这也是允许的,但是会导致这个锁一直得不到释放,就出现了死锁。

lock.lock();

if (Thread.currentThread().interrupted()) {
    return ;
}

lock.unlock();

当然,这里只是举个例子,实际使用肯定是要把lock.lock()后面的代码都放在try...finally...里面的以保证锁始终会释放,这里主要是为了说明线程中断只是一个标志,至于要做什么完全由用户自己决定。

tryLock()方法

尝试获取一次锁,成功了就返回true,没成功就返回false,不会继续尝试。

// ReentrantLock.tryLock()
public boolean tryLock() {
    // 直接调用Sync的nonfairTryAcquire()方法
    return sync.nonfairTryAcquire(1);
}
// ReentrantLock.Sync.nonfairTryAcquire()
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

tryLock()方法比较简单,直接以非公平的模式去尝试获取一次锁,获取到了或者锁本来就是当前线程占有着就返回true,否则返回false。

tryLock(long time, TimeUnit unit)方法

尝试获取锁,并等待一段时间,如果在这段时间内都没有获取到锁,就返回false。

// ReentrantLock.tryLock()
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    // 调用AQS中的方法
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
// AbstractQueuedSynchronizer.tryAcquireNanos()
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 如果线程中断了,抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 先尝试获取一次锁
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
// AbstractQueuedSynchronizer.doAcquireNanos()
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 如果时间已经到期了,直接返回false
    if (nanosTimeout <= 0L)
        return false;
    // 到期时间
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            // 如果到期了,就直接返回false
            if (nanosTimeout <= 0L)
                return false;
            // spinForTimeoutThreshold = 1000L;
            // 只有到期时间大于1000纳秒,才阻塞
            // 小于等于1000纳秒,直接自旋解决就得了
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                // 阻塞一段时间
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

tryLock(long time, TimeUnit unit)方法在阻塞的时候加上阻塞时间,并且会随时检查是否到期,只要到期了没获取到锁就返回false。

unlock()方法

释放锁。

// java.util.concurrent.locks.ReentrantLock.unlock()
public void unlock() {
    sync.release(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.release
public final boolean release(int arg) {
    // 调用AQS实现类的tryRelease()方法释放锁
    if (tryRelease(arg)) {
        Node h = head;
        // 如果头节点不为空,且等待状态不是0,就唤醒下一个节点
        // 还记得waitStatus吗?
        // 在每个节点阻塞之前会把其上一个节点的等待状态设为SIGNAL(-1)
        // 所以,SIGNAL的准确理解应该是唤醒下一个等待的线程
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
// java.util.concurrent.locks.ReentrantLock.Sync.tryRelease
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 如果当前线程不是占有着锁的线程,抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 如果状态变量的值为0了,说明完全释放了锁
    // 这也就是为什么重入锁调用了多少次lock()就要调用多少次unlock()的原因
    // 如果不这样做,会导致锁不会完全释放,别的线程永远无法获取到锁
    if (c == 0) {
        free = true;
        // 清空占有线程
        setExclusiveOwnerThread(null);
    }
    // 设置状态变量的值
    setState(c);
    return free;
}
private void unparkSuccessor(Node node) {
    // 注意,这里的node是头节点

    // 如果头节点的等待状态小于0,就把它设置为0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 头节点的下一个节点
    Node s = node.next;
    // 如果下一个节点为空,或者其等待状态大于0(实际为已取消)
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从尾节点向前遍历取到队列最前面的那个状态不是已取消状态的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果下一个节点不为空,则唤醒它
    if (s != null)
        LockSupport.unpark(s.thread);
}

释放锁的过程大致为:

(1)将state的值减1;

(2)如果state减到了0,说明已经完全释放锁了,唤醒下一个等待着的节点;

未完待续,下一章我们继续学习ReentrantLock中关于条件锁的部分

彩蛋

为什么ReentrantLock默认采用的是非公平模式?

答:因为非公平模式效率比较高。

为什么非公平模式效率比较高?

答:因为非公平模式会在一开始就尝试两次获取锁,如果当时正好state的值为0,它就会成功获取到锁,少了排队导致的阻塞/唤醒过程,并且减少了线程频繁的切换带来的性能损耗。

非公平模式有什么弊端?

答:非公平模式有可能会导致一开始排队的线程一直获取不到锁,导致线程饿死。

推荐阅读

  1. 死磕 java同步系列之AQS起篇

  2. 死磕 java同步系列之自己动手写一个锁Lock

  3. 死磕 java魔法类之Unsafe解析

  4. 死磕 java同步系列之JMM(Java Memory Model)

  5. 死磕 java同步系列之volatile解析

  6. 死磕 java同步系列之synchronized解析

欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。


当前标题:死磕java同步系列之ReentrantLock源码解析(一)——公平锁、非公平锁
标题URL:http://njwzjz.com/article/isgpdp.html