刚入行java开发或者说在JDK1.5之前,遇到并发问题时,第一印象想到的都是synchronized
同步锁。
关于同步锁的使用,我们在前面文章中已经聊过。
其中,我们在使用synchronized
的时候,会存在以下几个问题:
-
synchronized是不可中断锁,需要线程执行完才能释放锁。 -
synchronized是非公平锁。 -
在Synchronized优化以前,synchronized的性能非常不乐观,但是自从Synchronized引入了偏向锁,轻量级锁(自旋锁)后,性能有所提升。 -
synchronized的细粒度和灵活度不够好。 -
….
于是,Doug Lea
大师就搞了个ReentrantLock
,ReentrantLock诞生于JDK1.5
,位于java.util.concurrent
(简称JUC
)包目录下。
努力从今天开始,成功从“
零
”开始
本文内容
大致分为以下几个核心知识点:
1.简单使用ReentrantLock
2.公平锁和非公平锁
3.源码分析AQS
中的state
和线程等待队列(CLH
)
4.公平锁和非公平锁获取锁的方法和不同点
5.超时获取锁是如何实现的
6.锁是怎么释放的
文章有点长,因为涉及到部分源代码分析。
重点知识点
state、线程同步队列、CAS
、死循环
简单使用ReentrantLock
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockDemo {
private static final Lock lock = new ReentrantLock();
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
test();
}
},"线程1").start();
new Thread(new Runnable() {
@Override
public void run() {
test();
}
},"线程2").start();
}
public static void test() {
try {
//获取锁
lock.lock();
System.out.println(Thread.currentThread().getName() + "获取到锁了");
//业务代码,使用部分花费100毫秒
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放锁放在finally中。
lock.unlock();
System.out.println(Thread.currentThread().getName() + "释放了锁");
}
}
}
输出
线程1获取到锁了
线程1释放了锁
线程2获取到锁了
线程2释放了锁
效果和synchronized的一样,线程1获取到锁了,线程2需要等待线程1释放锁后才可以获取锁。
注意
为了防止锁不被释放,从而造成死锁,所以强烈建议把锁释放放在finally模块中。
ReentrantLock
整体介绍
字面意思为可重入的锁,顾名思义ReentrantLock
也是可重入锁。synchronized也是可重入锁。
进入java.util.concurrent.locks.ReentrantLock
中,发现ReentrantLock
有三个内部类;
对应UML类图结构如下:
看到了一个熟悉的身影java.util.concurrent.locks.AbstractQueuedSynchronizer
(江湖人简称为AQS
),这就是传说中的AQS
(同步队列器)。
我们都知道锁分为公平锁和非公平锁。你那么上面类图中NonfairSync
为非公平锁,FairSync
为公平锁。
lock方法和unlock方法
private final Sync sync;
//ReentrantLock无参构造方法
public ReentrantLock() {
sync = new NonfairSync();
}
public void lock() {
sync.lock();
}
从无参构造方法中可以得知ReentrantLock
默认是非公平锁。
进去NonfairSync
中的lock
//上锁,使用final修饰,次方法不能被重写
final void lock() {
//通过cas操作来修改state状态,表示争抢锁的操作
if (compareAndSetState(0, 1))
//设置当前获得锁状态的线程
setExclusiveOwnerThread(Thread.currentThread());
else
//尝试去获取锁
acquire(1);
}
以下三个方法都是AQS
中的。
compareAndSetState();//通过cas操作来修改state状态,表示争抢锁的操作
setExclusiveOwnerThread();//设置当前获得锁状态的线程
acquire();//尝试去获取锁
更别说ReentrantLock
中的unlock
方法了,直接都是从AQS
里开始的
//解锁
public void unlock() {
sync.release(1);
}
再到AQS
中release方法
//释放
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//空方法留给子类自己去实现
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
没法聊了,还是先暂时搁置到这里,因为公平锁和非公平锁都是继承自java.util.concurrent.locks.AbstractQueuedSynchronizer
。所以我们不得不先说说AQS
,先把AQS
搞清楚了上面的ReentrantLock
方可继续。
深入AQS
AbstractQueuedSynchronizer
类如其名,抽象的队列式的同步器,AQS
定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch
等并发工具类,还有阻塞队列和线程池的实现都有使用到AQS
。因为是抽象类,就可以想象到很有可能有抽象方法和已经实现好的方法。看源码中你会发现很多方法是空的,需要子类自己去实现。
它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。
state
这里volatile是核心关键词,具体volatile的语义,在此不述。state的访问方式有三种:
在AQS
中有个很重要的角色
private volatile int state;
//get set
//通过cas操作来修改state状态,表示争抢锁的操作
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
这段代码其实逻辑很简单,就是通过**CAS
乐观锁**的方式来做比较并替换。上面这段代码的意思是,如果当前内存中的state的值和预期值expect相等,则替换为update,更新成功返回true,否则返回false。
这个操作是原子的,不会出现线程安全问题,这里面涉及到Unsafe这个类的操作,一级涉及到state这个属性的意义。关于Unsafe类后面会专门写一篇文章来讲它。
细心的朋友估计应该注意到了,前面lock和unlock方法中涉及到到三个方法,方法参数都是1。
compareAndSetState(0, 1)
acquire(1);
release(1);
-
当state=0时,表示无锁状态 -
当state>0时,表示已经有线程获得了锁,也就是state=1。但是因为 ReentrantLock
允许重入,所以同一个线程多次获得同步锁的时候,state会递增,比如重入5次,那么state=5。而在释放锁的时候,同样需要释放5次直到state=0其他线程才有资格获得锁。
注意:
AQS
不同的实现类对于state字段的含义是有所差别的,这个点一定要注意。
线程等待队列
我们继续来看AQS
中的队列,该队列的实现是一个双向链表,被称为sync queue
,它表示所有等待锁的线程的集合,有点类似于我们前面介绍synchronized原理的时候说的wait set
。每个Node节点保存了当前线程的同步状态,等待状态,前驱和后继节点等。
我们前面说过,在并发编程中使用队列通常是将当前线程包装成某种类型的数据结构扔到等待队列中,我们先来看看队列中的每一个节点是怎么个结构:
static final class Node {
// 共享模式下等待的标记
static final Node SHARED = new Node();
// 独占模式下等待的标记
static final Node EXCLUSIVE = null;
// 线程的等待状态 表示线程已经被取消
static final int CANCELLED = 1;
// 线程的等待状态 表示后继线程需要被唤醒
static final int SIGNAL = -1;
// 线程的等待状态 表示线程在Condtion上
static final int CONDITION = -2;
// 表示下一个acquireShared需要无条件的传播
static final int PROPAGATE = -3;
/**
* SIGNAL: 当前节点的后继节点处于等待状态时,如果当前节点的同步状态被释放或者取消,
* 必须唤起它的后继节点
*
* CANCELLED: 一个节点由于超时或者中断需要在CLH队列中取消等待状态,被取消的节点不会再次等待
*
* CONDITION: 当前节点在等待队列中,只有当节点的状态设为0的时候该节点才会被转移到同步队列
*
* PROPAGATE: 下一次的共享模式同步状态的获取将会无条件的传播
* waitStatus的初始值时0,使用CAS来修改节点的状态
*/
volatile int waitStatus;
/**
* 当前节点的前驱节点,当前线程依赖它来检查waitStatus,在入队的时候才被分配,
* 并且只在出队的时候才被取消(为了GC),头节点永远不会被取消,一个节点成为头节点
* 仅仅是成功获取到锁的结果,一个被取消的线程永远也不会获取到锁,线程只取消自身,
* 而不涉及其他节点
*/
volatile Node prev;
/**
* 当前节点的后继节点,当前线程释放的才被唤起,在入队时分配,在绕过被取消的前驱节点
* 时调整,在出队列的时候取消(为了GC)
* 如果一个节点的next为空,我们可以从尾部扫描它的prev,双重检查
* 被取消节点的next设置为指向节点本身而不是null,为了isOnSyncQueue更容易操作
*/
volatile Node next;
//当前节点的线程,初始化后使用,在使用后失效
volatile Thread thread;
/**
* 链接到下一个节点的等待条件,或特殊的值SHARED,因为条件队列只有在独占模式时才能被访问,
* 所以我们只需要一个简单的连接队列在等待的时候保存节点,然后把它们转移到队列中重新获取
* 因为条件只能是独占性的,我们通过使用特殊的值来表示共享模式
*/
Node nextWaiter;
//如果节点处于共享模式下等待直接返回true
final boolean isShared() {
return nextWaiter == SHARED;
}
//返回当前节点的前驱节点,如果为空,直接抛出空指针异常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 用来建立初始化的head 或 SHARED的标记
Node() {
}
// 指定线程和模式的构造方法
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
// 指定线程和节点状态的构造方法
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
节点信息说完了,我们继续说sync queue,AQS是怎么使用这个队列的呢,既然是双向链表,操纵它自然只需要一个头结点和一个尾节点:
// 头结点,不代表任何线程,是一个哑结点
private transient volatile Node head;
// 尾节点,每一个请求锁的线程会加到队尾
private transient volatile Node tail;
都是用了volatile修饰,以确保多线程间保证字段的可见性。
那么这个同步队列大整体就应该是这样的:
不过这里有一点我们提前说一下,在AQS中的队列是一个CLH队列,它的head节点永远是一个哑结点(dummy node), 它不代表任何线程(某些情况下可以看做是代表了当前持有锁的线程),因此head所指向的Node的thread属性永远是null。只有从次头节点往后的所有节点才代表了所有等待锁的线程。也就是说,在当前线程没有抢到锁被包装成Node扔到队列中时,即使队列是空的,它也会排在第二个,我们会在它的前面新建一个dummy节点(具体的代码我们在后面分析源码时再详细讲)。为了便于描述,下文中我们把除去head节点的队列称作是等待队列,在这个队列中的节点才代表了所有等待锁的线程:
在继续往下之前我们再对着上图总结一下Node节点各个参数的含义:
-
thread
:表示当前Node所代表的线程 -
waitStatus
:表示节点所处的等待状态,共享锁模式下只需关注三种状态:SIGNAL
CANCELLED
初始态(0)
-
prev
next
:节点的前驱和后继 -
nextWaiter
:进作为标记,值永远为null,表示当前处于独占锁模式
AQS中有三个重要的点,state、等待队列还有
当前持有锁的线程,注意这个属性是从AbstractOwnableSynchronizer
继承而来
private transient Thread exclusiveOwnerThread;
//执行的方法(方法修饰符为protected,只允许子类调用)
//设置当前锁被thread线程锁持有,重入锁条件之一就是使用这个来判断当前线程是不是锁持有的线程
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
讲完AQS
了的整体,我们继续回到上面的lock方法,我们今天的目标是搞定ReentrantLock
。继续往下看。
深入lock方法
非公平锁
private final Sync sync;
//ReentrantLock无参构造方法
public ReentrantLock() {
sync = new NonfairSync();
}
public void lock() {
sync.lock();
}
NonfairSync
中lock方法
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
//上锁,使用final修饰,次方法不能被重写
final void lock() {
//方法1:通过cas操作来修改state状态,表示争抢锁的操作
if (compareAndSetState(0, 1))
//方法2:设置当前获得锁状态的线程
setExclusiveOwnerThread(Thread.currentThread());
else
//方法3:尝试去获取锁
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
//nonfairTryAcquire方法是Sync中实现的方法
return nonfairTryAcquire(acquires);
}
}
lock中的方法1和方法2在前面我们已经说过。接下来我们来说一下方法3。
这里方法3:acquire
是AQS
中的方法
//注意此时的arg=1
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}
acquire方法中共有四个方法:
tryAcquire//尝试获得锁
addWaiter//添加一个等待节点
acquireQueued//
selfInterrupt
下面对上面上个方法逐个说明
tryAcquire方法
这个方法在AQS中是一个空方法,留个子类自己去实现。上面我们使用的是非公平锁。所以回到NonfairSync
中
//acquires=1
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
这里方法nonfairTryAcquire
是Sync的方法
//acquires=1
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//把state赋值为1,exclusiveOwnerThread赋值为thread2,然后返回true
if (compareAndSetState(0, acquires)) {
//把当前持有锁的线程设置为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
//判断当前持有锁的线程是不是当前线程
else if (current == getExclusiveOwnerThread()) {
//state = state +1
int nextc = c + acquires;
if (nextc 0) // overflow
throw new Error("Maximum lock count exceeded");
//把state更新
setState(nextc);
return true;
}
return false;
}
上面这段代码的逻辑大致为:
1,获取当前线程
2,获取当前state,因为state是volatile修饰的,所不用考虑线程可见性问题。
3,判断state==0,表示锁没有被持有,把state设置成1,把锁持有线程设置成当前线程,返回true表示以获取锁。
4,state!=0,判断持有锁的线程是不是当前线程
5,是当前线程,state=state+1,返回true表示获取锁成功。
现任如果线程1进来,执行完第三步就结束了。
addWaiter
方法
线程1把锁持有了,把state设置成了1,这时候线程2来执行上面的nonfairTryAcquire
方法返回false,那么这时候就会执行addWaiter
方法。把上面的获取acquire方法再贴一次:
//注意此时的arg=1
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}
addWaiter(Node.EXCLUSIVE)
,再执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
方法。下面用一个时序图来看看当前到哪一步了。
那到底这个addWaiter
方法做了写什么呢?
static final Node EXCLUSIVE = null;
//因为Node.EXCLUSIVE=null,所以mode=null
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//1
//此时tail=null
//尝试快速方式直接放到队尾。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失败则通过enq入队。
enq(node);
return node;
}
enq方法
private Node enq(final Node node) {
//CAS"自旋",直到成功加入队尾
for (;;) {
Node t = tail;
if (t == null) {
//CAS"自旋",直到成功加入队尾
if (compareAndSetHead(new Node()))
tail = head;
} else {//CAS"自旋",直到成功加入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued
方法
该方法由AQS
实现,这个方法比较复杂, 主要对上面刚加入队列的Node不断尝试以下两种操作之一。
-
在前驱节点就是head节点的时候,继续尝试获取锁 -
将当前线程挂起,使CPU不再调度它
该线程获取资源失败,已经被放入等待队列尾部了。聪明的你立刻应该能想到该线程下一部该干什么了吧:进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。没错,就是这样!是不是跟医院排队拿号有点相似~~acquireQueued()就是干这件事:在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回。这个函数非常关键,继续看源码:
final boolean acquireQueued(final Node node, int arg) {
//标记是否成功拿到资源
boolean failed = true;
try {
//标记等待过程中是否被中断过
boolean interrupted = false;
//又是一个“自旋”!
for (;;) {
//拿到前驱
final Node p = node.predecessor();
//如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源
//(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
if (p == head && tryAcquire(arg)) {
//拿到资源后,将head指向该结点。所以head所指的标杆结点,
//就是当前获取到资源的那个结点或null。
setHead(node);
// setHead中node.prev已置为null,此处再将head.next置为null,
//就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
p.next = null;
failed = false; // 成功获取资源
return interrupted;//返回等待过程中是否被中断过
}
//如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,
//那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
}
} finally {
// 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),
//那么取消结点在队列中的等待。
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire方法
此方法主要用于检查状态,看看自己是否真的可以去休息了,万一队列前边的线程都放弃了只是瞎站着,那也说不定,对吧!
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//拿到前驱的状态
if (ws == Node.SIGNAL)
//如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
return true;
if (ws > 0) {
/*
* 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
* 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。
parkAndCheckInterrup
方法
如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。就是传说中的park方法
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//调用park()使线程进入waiting状态
return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。
}
park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。
acquireQueued
方法总结
-
结点进入队尾后,检查状态,找到安全休息点; -
调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己; -
被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。
selfInterrupt
方法
该方法由AQS实现, 用于中断当前线程。由于在整个抢锁过程中,我们都是不响应中断的。那如果在抢锁的过程中发生了中断怎么办呢,总不能假装没看见呀。AQS的做法简单的记录有没有有发生过中断,如果返回的时候发现曾经发生过中断,则在退出acquire
方法之前,就调用selfInterrupt
自我中断一下,就好像将这个发生在抢锁过程中的中断“推迟”到抢锁结束以后再发生一样。
tryAcquire
方法总结
再贴一遍tryAcquire
方法的源码
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}
acquire 的流程
-
调用自定义同步器的 tryAcquire()
尝试直接去获取资源,如果成功则直接返回; -
没成功,则 addWaiter()
将该线程加入等待队列的尾部,并标记为独占模式; -
acquireQueued()
使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。 -
如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断 selfInterrupt()
,将中断补上。
由于此函数是重中之重,我再用流程图总结一下:
至此,acquire()的流程终于算是告一段落了。这也就是ReentrantLock
的lock()
的流程,不信你去看其lock()源码吧,整个函数就是一条acquire(1)!
不容易滴。搞了半天还只是搞了个lock方法。
非公平锁占用锁的的整个流程
公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
//直接调用AQS中的acquire方法
acquire(1);
}
//公平锁和非公平锁的区别在于公平锁多方法hasQueuedPredecessors
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
//如果当前线程是队列的最前面或者队列是空的,则当前线程可以回去锁
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
公平锁和非公平锁的主要区别:公平锁会考虑前面有没有线程在等待队列里,就是前面有没有线程先进来,先来先到。
深入unlock方法
释放锁和锁的公平性就没关系了,继续在ReentrantLock
中的unlock方法
private final Sync sync;
public ReentrantLock() {
sync = new NonfairSync();
}
public void unlock() {
sync.release(1);
}
这里的release方法是AQS
中的release方法,此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义,当然不仅仅只限于unlock()
。
以下是release()的源码:
//有点模板方法模式
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//空方法,留给子类去实现
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
这里的tryRelease
方法是AQS
的子类Sync
,也就是公平锁和非公平岁的父类实现的
//释放当前线程占用的锁 releases=1
protected final boolean tryRelease(int releases) {
//计算state=state-1
int c = getState() - releases;
//判断持有锁的线程是不是当前线程
//不是抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果state==0证明本次锁释放成功
if (c == 0) {
free = true;
//锁持有线程设置成null
setExclusiveOwnerThread(null);
}
//把state设置成0
setState(c);
return free;
}
释放锁还是蛮简单的。到此释放锁就结束了。
超时获取锁
在ReetrantLock
的tryLock(long timeout, TimeUnit unit)
提供了超时获取锁的功能。它的语义是在指定的时间内如果获取到锁就返回true,获取不到则返回false。这种机制避免了线程无限期的等待锁释放。
继续看看源码里是怎么实现的
//timeout时间长短
//unit时间单位
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
方法tryAcquireNanos
是AQS
中的方法
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
//判断是否已被中断
if (Thread.interrupted())
throw new InterruptedException();
//
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
关于方法tryAcquire在前面我们已经说过了。这里就不再累赘了。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
//时长小于等于0就没有必要再尝试获取了,直接返回false没拿到锁
if (nanosTimeout 0L) return false;
//线程有效期期deadline
final long deadline = System.nanoTime() + nanosTimeout;
//创建一个结点node
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
//死循环尝试获取---自旋
for (;;) {
//获取等待队列的前驱结点
final Node p = node.predecessor();
//如果前驱是头节点并且占用锁成功,则将当前节点变成头结点
//tryAcquire方法前面已经说过了就是尝试获取锁
if (p == head && tryAcquire(arg)) {
//成功了,将当前节点变成头结点
setHead(node);
//方便GC
p.next = null;
failed = false;
return true;
}
//计算当前还剩多少时间
nanosTimeout = deadline - System.nanoTime();
//如果这个死循环把时间耗完了还拿到就返回false,没拿到锁
if (nanosTimeout 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
//再次判断是否被中断了,中断了直接抛中断异常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//拿到前驱的状态
if (ws == Node.SIGNAL)
//如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
return true;
if (ws > 0) {
/*
* 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
* 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,
* 稍后就会被保安大叔赶走了(GC回收)!
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。
//有可能失败,人家说不定刚刚释放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private static final boolean compareAndSetWaitStatus(Node node,int expect,int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}
超时获取锁的总结
如果超时时间设置小于等于0,则直接返回获取失败。线程先入等待队列,然后开始自旋,尝试获取锁,获取成功就返回,失败则在队列里找一个安全点把自己挂起直到超时时间过期。
很多人可能会问:这里为什么还需要循环呢?
因为当前线程节点的前驱状态可能不是SIGNAL,那么在当前这一轮循环中线程不会被挂起,然后更新超时时间,开始新一轮的尝试
最后
好了,到此我们今天的内容就分享结束了,如果有疑问,或者对文章中某些知识讲的不是很清楚的,可以加我微信,我们搬个板凳坐着聊。
参考
https://segmentfault.com/a/1190000015739343
https://blog.csdn.net/u010452388/article/details/90485326
https://www.cnblogs.com/waterystone/p/4920797.html
0 条评论