本小节介绍锁释放lock.unlock()。
release/tryrelease
unlock操作实际上就调用了aqs的release操作,释放持有的锁。
public final boolean release(int arg) {
if (tryrelease(arg)) {
node h = head;
if (h != null && h.waitstatus != 0)
unparksuccessor(h);
return true;
}
return false;
}
前面提到过tryrelease(arg)操作,此操作里面总是尝试去释放锁,如果成功,说明锁确实被当前线程持有,那么就看aqs队列中的头结点是否为空并且能否被唤醒,如果可以的话就唤醒继任节点(下一个非cancelled节点,下面会具体分析)。
对于独占锁而言,java.util.concurrent.locks.reentrantlock.sync.tryrelease(int)展示了如何尝试释放锁(tryrelease)操作。
protected final boolean tryrelease(int releases) {
int c = getstate() - releases;
if (thread.currentthread() != getexclusiveownerthread())
throw new illegalmonitorstateexception();
boolean free = false;
if (c == 0) {
free = true;
setexclusiveownerthread(null);
}
setstate(c);
return free;
}
整个tryrelease操作是这样的:
- 判断持有锁的线程是否是当前线程,如果不是就抛出illegalmonitorstateexeception(),因为一个线程是不能释放另一个线程持有的锁(否则锁就失去了意义)。否则进行2。
- 将aqs状态位减少要释放的次数(对于独占锁而言总是1),如果剩余的状态位0(也就是没有线程持有锁),那么当前线程就是最后一个持有锁的线程,清空aqs持有锁的独占线程。进行3。
- 将剩余的状态位写回aqs,如果没有线程持有锁就返回true,否则就是false。
参考上一节的分析就可以知道,这里c==0决定了是否完全释放了锁。由于reentrantlock是可重入锁,因此同一个线程可能多重持有锁,那么当且仅当最后一个持有锁的线程释放锁是才能将aqs中持有锁的独占线程清空,这样接下来的操作才需要唤醒下一个需要锁的aqs节点(node),否则就只是减少锁持有的计数器,并不能改变其他操作。
当tryrelease操作成功后(也就是完全释放了锁),release操作才能检查是否需要唤醒下一个继任节点。这里的前提是aqs队列的头结点需要锁(waitstatus!=0),如果头结点需要锁,就开始检测下一个继任节点是否需要锁操作。
在上一节中说道acquirequeued操作完成后(拿到了锁),会将当前持有锁的节点设为头结点,所以一旦头结点释放锁,那么就需要寻找头结点的下一个需要锁的继任节点,并唤醒它。
private void unparksuccessor(node node) {
//此时node是需要是需要释放锁的头结点
//清空头结点的waitstatus,也就是不再需要锁了
compareandsetwaitstatus(node, node.signal, 0);
//从头结点的下一个节点开始寻找继任节点,当且仅当继任节点的waitstatus<=0才是有效继任节点,否则将这些waitstatus>0(也就是cancelled的节点)从aqs队列中剔除
//这里并没有从head->tail开始寻找,而是从tail->head寻找最后一个有效节点。
//解释在这里 http://www.blogjava.net/xylz/archive/2010/07/08/325540.html#377512
node s = node.next;
if (s == null || s.waitstatus > 0) {
s = null;
for (node t = tail; t != null && t != node; t = t.prev)
if (t.waitstatus <= 0)
s = t;
}
//如果找到一个有效的继任节点,就唤醒此节点线程
if (s != null)
locksupport.unpark(s.thread);
}
这里再一次把acquirequeued的过程找出来。对比unparksuccessor,一旦头节点的继任节点被唤醒,那么继任节点就会尝试去获取锁(在acquirequeued中node就是有效的继任节点,p就是唤醒它的头结点),如果成功就会将头结点设置为自身,并且将头结点的前任节点清空,这样前任节点(已经过时了)就可以被gc释放了。
final boolean acquirequeued(final node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final node p = node.predecessor();
if (p == head && tryacquire(arg)) {
sethead(node);
p.next = null; // help gc
return interrupted;
}
if (shouldparkafterfailedacquire(p, node) &&
parkandcheckinterrupt())
interrupted = true;
}
} catch (runtimeexception ex) {
cancelacquire(node);
throw ex;
}
}
在sethead中,将头结点的前任节点清空并且将头结点的线程清空就是为了更好的gc,防止内存泄露。
private void sethead(node node) {
head = node;
node.thread = null;
node.prev = null;
}
对比lock()操作,unlock()操作还是比较简单的,主要就是释放响应的资源,并且唤醒aqs队列中有效的继任节点。这样所就按照请求的顺序去尝试获取锁了。
整个lock()/unlock()过程完成了,我们再回头看公平锁(fairsync)和非公平锁(nonfairsync)。
公平锁和非公平锁只是在获取锁的时候有差别,其它都是一样的。
final void lock() {
if (compareandsetstate(0, 1))
setexclusiveownerthread(thread.currentthread());
else
acquire(1);
}
在上面非公平锁的代码中总是优先尝试当前是否有线程持有锁,一旦没有任何线程持有锁,那么非公平锁就霸道的尝试将锁“占为己有”。如果在抢占锁的时候失败就和公平锁一样老老实实的去排队。
也即是说公平锁和非公平锁只是在入aqs的clh队列之前有所差别,一旦进入了队列,所有线程都是按照队列中先来后到的顺序请求锁。
condition
条件变量很大一个程度上是为了解决object.wait/notify/notifyall难以使用的问题。
条件(也称为条件队列 或条件变量)为线程提供了一个含义,以便在某个状态条件现在可能为 true 的另一个线程通知它之前,一直挂起该线程(即让其“等待”)。因为访问此共享状态信息发生在不同的线程中,所以它必须受保护,因此要将某种形式的锁与该条件相关联。等待提供一个条件的主要属性是:以原子方式 释放相关的锁,并挂起当前线程,就像 object.wait
做的那样。
上述api说明表明条件变量需要与锁绑定,而且多个condition需要绑定到同一锁上。前面的lock中提到,获取一个条件变量的方法是lock.newcondition()。
void await() throws interruptedexception;
void awaituninterruptibly();
long awaitnanos(long nanostimeout) throws interruptedexception;
boolean await(long time, timeunit unit) throws interruptedexception;
boolean awaituntil(date deadline) throws interruptedexception;
void signal();
void signalall();
以上是condition接口定义的方法,await*对应于object.wait,signal对应于object.notify,signalall对应于object.notifyall。特别说明的是condition的接口改变名称就是为了避免与object中的wait/notify/notifyall的语义和使用上混淆,因为condition同样有wait/notify/notifyall方法。
每一个lock可以有任意数据的condition对象,condition是与lock绑定的,所以就有lock的公平性特性:如果是公平锁,线程为按照fifo的顺序从condition.await中释放,如果是非公平锁,那么后续的锁竞争就不保证fifo顺序了。
一个使用condition实现生产者消费者的模型例子如下。
package xylz.study.concurrency.lock;
import java.util.concurrent.locks.condition;
import java.util.concurrent.locks.lock;
import java.util.concurrent.locks.reentrantlock;
public class productqueue {
private final t[] items;
private final lock lock = new reentrantlock();
private condition notfull = lock.newcondition();
private condition notempty = lock.newcondition();
//
private int head, tail, count;
public productqueue(int maxsize) {
items = (t[]) new object[maxsize];
}
public productqueue() {
this(10);
}
public void put(t t) throws interruptedexception {
lock.lock();
try {
while (count == getcapacity()) {
notfull.await();
}
items[tail] = t;
if ( tail == getcapacity()) {
tail = 0;
}
count;
notempty.signalall();
} finally {
lock.unlock();
}
}
public t take() throws interruptedexception {
lock.lock();
try {
while (count == 0) {
notempty.await();
}
t ret = items[head];
items[head] = null;//gc
//
if ( head == getcapacity()) {
head = 0;
}
--count;
notfull.signalall();
return ret;
} finally {
lock.unlock();
}
}
public int getcapacity() {
return items.length;
}
public int size() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}
在这个例子中消费take()需要 队列不为空,如果为空就挂起(await()),直到收到notempty的信号;生产put()需要队列不满,如果满了就挂起(await()),直到收到notfull的信号。
可能有人会问题,如果一个线程lock()对象后被挂起还没有unlock,那么另外一个线程就拿不到锁了(lock()操作会挂起),那么就无法通知(notify)前一个线程,这样岂不是“死锁”了?
await* 操作
上一节中说过多次reentrantlock是独占锁,一个线程拿到锁后如果不释放,那么另外一个线程肯定是拿不到锁,所以在lock.lock()和lock.unlock()之间可能有一次释放锁的操作(同样也必然还有一次获取锁的操作)。我们再回头看代码,不管take()还是put(),在进入lock.lock()后唯一可能释放锁的操作就是await()了。也就是说await()操作实际上就是释放锁,然后挂起线程,一旦条件满足就被唤醒,再次获取锁!
public final void await() throws interruptedexception {
if (thread.interrupted())
throw new interruptedexception();
node node = addconditionwaiter();
int savedstate = fullyrelease(node);
int interruptmode = 0;
while (!isonsyncqueue(node)) {
locksupport.park(this);
if ((interruptmode = checkinterruptwhilewaiting(node)) != 0)
break;
}
if (acquirequeued(node, savedstate) && interruptmode != throw_ie)
interruptmode = reinterrupt;
if (node.nextwaiter != null)
unlinkcancelledwaiters();
if (interruptmode != 0)
reportinterruptafterwait(interruptmode);
}
上面是await()的代码片段。上一节中说过,aqs在获取锁的时候需要有一个chl的fifo队列,所以对于一个condition.await()而言,如果释放了锁,要想再一次获取锁那么就需要进入队列,等待被通知获取锁。完整的await()操作是安装如下步骤进行的:
- 将当前线程加入condition锁队列。特别说明的是,这里不同于aqs的队列,这里进入的是condition的fifo队列。后面会具体谈到此结构。进行2。
- 释放锁。这里可以看到将锁释放了,否则别的线程就无法拿到锁而发生死锁。进行3。
- 自旋(while)挂起,直到被唤醒或者超时或者cacelled等。进行4。
- 获取锁(acquirequeued)。并将自己从condition的fifo队列中释放,表明自己不再需要锁(我已经拿到锁了)。
这里再回头介绍condition的数据结构。我们知道一个condition可以在多个地方被await*(),那么就需要一个fifo的结构将这些condition串联起来,然后根据需要唤醒一个或者多个(通常是所有)。所以在condition内部就需要一个fifo的队列。
private transient node firstwaiter;
private transient node lastwaiter;
上面的两个节点就是描述一个fifo的队列。我们再结合前面提到的节点(node)数据结构。我们就发现node.nextwaiter就派上用场了!nextwaiter就是将一系列的condition.await*串联起来组成一个fifo的队列。
signal/signalall 操作
await*()清楚了,现在再来看signal/signalall就容易多了。按照signal/signalall的需求,就是要将condition.await*()中fifo队列中第一个node唤醒(或者全部node)唤醒。尽管所有node可能都被唤醒,但是要知道的是仍然只有一个线程能够拿到锁,其它没有拿到锁的线程仍然需要自旋等待,就上上面提到的第4步(acquirequeued)。
private void dosignal(node first) {
do {
if ( (firstwaiter = first.nextwaiter) == null)
lastwaiter = null;
first.nextwaiter = null;
} while (!transferforsignal(first) &&
(first = firstwaiter) != null);
}
private void dosignalall(node first) {
lastwaiter = firstwaiter = null;
do {
node next = first.nextwaiter;
first.nextwaiter = null;
transferforsignal(first);
first = next;
} while (first != null);
}
上面的代码很容易看出来,signal就是唤醒condition队列中的第一个非cancelled节点线程,而signalall就是唤醒所有非cancelled节点线程。当然了遇到cancelled线程就需要将其从fifo队列中剔除。
final boolean transferforsignal(node node) {
if (!compareandsetwaitstatus(node, node.condition, 0))
return false;
node p = enq(node);
int c = p.waitstatus;
if (c > 0 || !compareandsetwaitstatus(p, c, node.signal))
locksupport.unpark(node.thread);
return true;
}
上面就是唤醒一个await*()线程的过程,根据前面的小节介绍的,如果要unpark线程,并使线程拿到锁,那么就需要线程节点进入aqs的队列。所以可以看到在locksupport.unpark之前调用了enq(node)操作,将当前节点加入到aqs队列。
整个锁机制的原理就介绍完了,从下一节开始就进入了锁机制的应用了。
©2009-2014 imxylz
|求贤若渴