06、Java JUC源码分析 - locks-AQS-独占模式
AbstractQueuedSynchronizer(下面简称AQS),javadoc说明: Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues。
1、 提供一个FIFO等待队列,使用方法伪代码表示就是:;
Acquire:
if(!获取到锁){
加入队列
}
Release:
if(释放锁){
unlock等待队列头结点的thread
}
2、 内部使用volatileintstate来表示一个同步状态,这个字段既可以表示lock的状态,也可以用来表示lock的次数,例如Semaphore使用该字段表示许可次数,ReentrantLock用来表示可重入次数,我们也可以自行定义成状态值来表示线程运行状态子类继承AQS的时候必须实现Serializable;
3、 提供独占和共享2套api,一般使用就是维护一个内部类继承AQS,实现其中一套api,判断是否获取到锁ReentrantLock使用的是独占api,CountDownLatch使用的共享api子类实现的protected方法为:;
独占api,判断是否获取到锁:
tryAcquire
tryRelease
共享api,判断是否获取到锁:
tryAcquireShared
tryReleaseShared
isHeldExclusively(这个暂时不管)
4、 AQS提供了condition用来实现wait/notify功能,入ReentrantLock.newCondition();
5、 1.7版本JUC中使用到AQS的有:ReentrantLock/ReentrantReadWriteLock/Semaphore;
AQS继承了AbstractOwnableSynchronizer这个类:
//独占模式下持有锁的线程
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread t) {
exclusiveOwnerThread = t;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
AQS的队列定义:
private transient volatile Node head;
private transient volatile Node tail;
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
通过unsafe设置队列的head/tail/state/waitStatus和节点的next值,我们可以看出队列的大致结构为:
看下队列节点的具体定义:
static final class Node {
//标记节点类型是共享还是独占
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
//下面4个是节点状态值
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/**
节点状态,对应上面几个状态值:
0:normal status
1:节点被取消,cancelled状态的节点运行过程会被清理掉
-1:需要唤醒当前节点的下一个节点
-2:用在newCondition的情况下,condition时还为维护另一个条件队列
-3:共享模式下,表示需要将release传递到队列的其他节点
*/
volatile int waitStatus;
volatile Node prev;
//next为null,并不代表改节点是tail节点,因为在加入队列时,是先pre再next的
volatile Node next;
volatile Thread thread;
//独占模式时,指向条件队列的下一个节点,或者共享模式下值为SHARED
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
一.独占模式下acquire和release
Acquire:
不响应中断的acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt(); //挂起后唤醒返回的中断状态是true的话,这里会中断当前线程
}
由子类实现tryAcquire,AQS不提供
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
如果没有获取到,则addWaiter加入等待队列,并挂起线程:
private Node addWaiter(Node mode) {
//初始化一个node节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//先尝试直接加入到尾节点后面,
//从这里也可以看出,先将node的pre指向尾节点,然后cas设置tail,再将原tail的next指向节点,
//所以可能next为空的情况存在,但是已经加入的节点的pre肯定是存在
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//失败的话,for循环loop加入
enq(node);
return node;
}
看下enq操作:
private Node enq(final Node node) {
//loop操作,tail不存在的情况会初始化一个空节点,并将head和tail都指向空节点,
//然后cas加入node,确保节点一定会加入
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在将节点加入等待队列之后,尝试挂起线程:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//新加入node的pre节点
final Node p = node.predecessor();
//如果pre节点是头结点,再次重试acquire,如果成功则设置node为头结点
//需要注意的是,头结点代表的是持有锁的节点
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果pre不是头结点或acquire失败,则尝试挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//如果上面的操作发生异常,需要将node
if (failed)
cancelAcquire(node);
}
}
/**
设置头结点
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
/**
检查是否需要挂起
这个方法就是设置新加入节点的pre节点的waitStatus为SIGNAL(肯定成功),
这样在pre节点release的时候判断是不是需要唤醒下个节点
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* 设置过程中会过滤Cancelled状态的节点,把cancelled状态的节点去掉
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
调用Locksupport.park阻塞线程
*/
private final boolean parkAndCheckInterrupt() {
//挂起线程
LockSupport.park(this);
//当pre节点release的时候检查状态为SIGNAL为会唤醒当前节点,这里会返回线程的中断状态
return Thread.interrupted();
}
/**
acquire和挂起过程中异常,需要取消acquire
*/
private void cancelAcquire(Node node) {
//为null直接返回
if (node == null)
return;
node.thread = null;
// 下面会跳过pre为cancelled的节点,将pre指向队列node前面第一个非取消状态节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext是队列node前面第一个非取消状态节点的下一个节点
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// 下面检查node节点的位置,如果是tail节点,直接将pred设置为尾节点,
//然后设置之前的pred的next为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 不是tail节点
int ws;
//这里判断经过上面处理的node的pre是不是head节点
//不是head节点就要cas保证其状态为SIGNAL
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
//node的next不为null且状态不是取消状态就node节点的next关联到pred节点的next节点
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//如果node的pre是头结点,需要唤醒node的next节点
unparkSuccessor(node);
}
//将next指向自己
node.next = node; // help GC
}
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 之前说过addWaiter的时候是先pre->tail->next,所以存在tail已经改变但是next还没有变化的情况
* 这里就会从tail往前查找不会null,且状态不是取消的节点
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//找到就unpark,但是unpark后也不一定acquire成功,acquire那边的for就会一直loop
if (s != null)
LockSupport.unpark(s.thread);
}
接下来看下响应中断的acquireInterruptibly方法,这里会先判断先线程是否中断,中断的会直接抛出异常,没有中断再尝试请求
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
doAcquireInterruptibly方法与之前的区别就是线程中断后直接抛出异常,不是像之前的那样return 中断状态到上一层
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//区别
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
支持中断和超时时间的
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
//取一次时间
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
//超时时间小于0就直接返回false
if (nanosTimeout <= 0)
return false;
//这里spinForTimeoutThreshold为static final long spinForTimeoutThreshold = 1000L;
//如果超时时间大于spinForTimeoutThreshold,park才有意思,否则直接自旋
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
//底层调用unsafe.park(false,nanosTimeout)
LockSupport.parkNanos(this, nanosTimeout);
//唤醒后重新计算一下时间
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
//如果线程中断,直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
响应中断和响应时间的acquire的其他跟acquire差不多。
Release
<span style="font-size:18px;">public final boolean release(int arg) {
//tryRelease是否可以释放由子类实现判断
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}</span>
unparkSuccessor上面已经讲过,unpark队列的第一个未取消状态的节点。
大致流程为: