上善若水
in general the oo style is to use a lot of little objects with a lot of little methods that give us a lot of plug points for overriding and variation. to do is to be -nietzsche, to bei is to do -kant, do be do be do -sinatra
posts - 146,comments - 147,trackbacks - 0

一直想好好学习concurrent包中的各个类的实现,然而经常看了一点就因为其他事情干扰而放下了。发现这样太不利于自己的成长了,因而最近打算潜心一件一件的完成自己想学习的东西。

对concurrent包的学习打算先从lock的实现开始,因而自然而然的就端起了abstractqueuedsynchronizer,然而要读懂这个类的源码并不是那么容易,因而我就开始问自己一个问题:如果自己要去实现这个一个lock对象,应该如何实现呢?

要实现lock对象,首先理解什么是锁?我自己从编程角度简单的理解,所谓锁对象(互斥锁)就是它能保证一次只有一个线程能进入它保护的临界区,如果有一个线程已经拿到锁对象,那么其他对象必须让权等待,而在该线程退出这个临界区时需要唤醒等待列表中的其他线程。更学术一些,中对同步机制准则的归纳(p50):

  1. 空闲让进。当无进程处于临界区时,表明临界资源处于空闲状态,应允许一个请求进入临界区的进程立即进入自己的临界区,以有效的利用临界资源。
  2. 忙则等待。当已有进程进入临界区时,表明临界资源正在被访问,因而其他试图进入临界区的进程必须等待,以保证对临界区资源的互斥访问。
  3. 有限等待。对要求访问临界资源的进程,应保证在有限时间内能进入自己的临界区,以免陷入“死等”状态。
  4. 让权等待。当进程不能进入自己的临界区时,应该释放处理机,以免进程陷入“忙等”状态。

说了那么多,其实对互斥锁很简单,只需要一个标记位,如果该标记位为0,表示没有被占用,因而直接获得锁,然后把该标记位置为1,此时其他线程发现该标记位已经是1,因而需要等待。这里对这个标记位的比较并设值必须是原子操作,而在jdk5以后提供的atomic包里的工具类可以很方便的提供这个原子操作。然而上面的四个准则应该漏了一点,即释放锁的线程(进程)和得到锁的线程(进程)应该是同一个,就像一把钥匙对应一把锁(理想的),所以一个非常简单的lock类可以这么实现:

public class spinlockv1 {
    
private final atomicinteger state = new atomicinteger(0);
    
private volatile thread owner; // 这里owner字段可能存在中间值,不可靠,因而其他线程不可以依赖这个字段的值
    
    
public void lock() {
        
while (!state.compareandset(01)) { }
        owner 
= thread.currentthread();
    }
    
    
public void unlock() {
        thread currentthread 
= thread.currentthread();
        
if (owner != currentthread || !state.compareandset(10)) {
            
throw new illegalstateexception("the lock is not owned by thread: "  currentthread);
        }
        owner 
= null;
    }
}

一个简单的测试方法:

    @test
    
public void testlockcorrectly() throws interruptedexception {
        
final int count = 100;
        thread[] threads 
= new thread[count];
        spinlockv1 lock 
= new spinlockv1();
        addrunner runner 
= new addrunner(lock);
        
for (int i = 0; i < count; i) { 
            threads[i] 
= new thread(runner, "thread-"  i);
            threads[i].start();
        }
        
        
for (int i = 0; i < count; i) {
            threads[i].join();
        }
        
        assertequals(count, runner.getstate());
    }
    
    
private static class addrunner implements runnable {
        
private final spinlockv1 lock;
        
private int state = 0;

        
public addrunner(spinlockv1 lock) {
            
this.lock = lock;
        }
        
        
public void run() {
            lock.lock();
            
try {
                quietsleep(
10);
                state
;
                system.out.println(thread.currentthread().getname() 
 ""  state);
            } 
finally {
                lock.unlock();
            }
        }
        
        
public int getstate() {
            
return state;
        }
    }

然而这个spinlock其实并不需要state这个字段,因为owner的赋值与否也是一种状态,因而可以用它作为一种互斥状态:

public class spinlockv2 {
    
private final atomicreference<thread> owner = new atomicreference<thread>(null);
    
    
public void lock() {
        
final thread currentthread = thread.currentthread();
        
while (!owner.compareandset(null, currentthread)) { }
    }
    
    
public void unlock() {
        thread currentthread 
= thread.currentthread();
        
if (!owner.compareandset(currentthread, null)) {
            
throw new illegalstateexception("the lock is not owned by thread: "  currentthread);
        }
    }
}

这在操作系统中被定义为整形信号量,然而整形信号量如果没拿到锁会一直处于“忙等”状态(没有遵循有限等待和让权等待的准则),因而这种锁也叫spin lock,在短暂的等待中它可以提升性能,因为可以减少线程的切换,concurrent包中的atomic大部分都采用这种机制实现,然而如果需要长时间的等待,“忙等”会占用不必要的cpu时间,从而性能会变的很差,这个时候就需要将没有拿到锁的线程放到等待列表中,这种方式在操作系统中也叫记录型信号量,它遵循了让权等待准则(当前没有实现有限等待准则)。在jdk6以后提供了locksupport.park()/locksupport.unpark()操作,可以将当前线程放入一个等待列表或将一个线程从这个等待列表中唤醒。然而这个park/unpark的等待列表是一个全局的等待列表,在unpartk的时候还是需要提供需要唤醒的thread对象,因而我们需要维护自己的等待列表,但是如果我们可以用jdk提供的工具类concurrentlinkedqueue,就非常容易实现,如locksupport文档中给出来的:

class fifomutex {
   
private final atomicboolean locked = new atomicboolean(false);
   
private final queue<thread> waiters = new concurrentlinkedqueue<thread>();

   
public void lock() {
     
boolean wasinterrupted = false;
     thread current 
= thread.currentthread();
     waiters.add(current);

     
// block while not first in queue or cannot acquire lock
     while (waiters.peek() != current ||!locked.compareandset(falsetrue)) {
        locksupport.park(
this);
        
if (thread.interrupted()) // ignore interrupts while waiting
          wasinterrupted = true;
     }

     waiters.remove();
     
if (wasinterrupted)          // reassert interrupt status on exit
        current.interrupt();
   }

   
public void unlock() {
     locked.set(
false);
     locksupport.unpark(waiters.peek());
   }
 }

在该代码事例中,有一个线程等待队列和锁标记字段,每次调用lock时先将当前线程放入这个等待队列中,然后拿出队列头线程对象,如果该线程对象正好是当前线程,并且成功 使用cas方式设置locked字段(这里需要两个同时满足,因为可能出现一个线程已经从队列中移除了但还没有unlock,此时另一个线程调用lock方法,此时队列头的线程就是第二个线程,然而由于第一个线程还没有unlock或者正在unlock,因而需要使用cas原子操作来判断是否要park),表示该线程竞争成功,获得锁,否则将当前线程park,这里之所以要放在 while循环中,因为park操作可能无理由返回(spuriously),如文档中给出的描述:

locksupport.park()
public static void park( blocker)
disables the current thread for thread scheduling purposes unless the permit is available.

if the permit is available then it is consumed and the call returns immediately; otherwise the current thread becomes disabled for thread scheduling purposes and lies dormant until one of three things happens:

  • some other thread invokes with the current thread as the target; or
  • some other thread the current thread; or
  • the call spuriously (that is, for no reason) returns.

this method does not report which of these caused the method to return. callers should re-check the conditions which caused the thread to park in the first place. callers may also determine, for example, the interrupt status of the thread upon return.

parameters:
blocker - the synchronization object responsible for this thread parking
since:
1.6
我在实现自己的类时就被这个“无理由返回”坑了好久。对于已经获得锁的线程,将该线程从等待队列中移除,这里由于concurrentlinkedqueue是线程安全的,因而能保证每次都是队列头的线程得到锁,因而在得到锁匙将队列头移除。unlock逻辑比较简单,只需要将locked字段打开(设置为false),唤醒(unpark)队列头的线程即可,然后该线程会继续在lock方法的while循环中继续竞争unlocked字段,并将它自己从线程队列中移除表示获得锁成功。当然安全起见,最好在unlock中加入一些验证逻辑,如解锁的线程和加锁的线程需要相同。

然而本文的目的是自己实现一个lock对象,即只使用一些基本的操作,而不使用jdk提供的atomic类和concurrentlinkedqueue。类似的首先我们也需要一个队列存放等待线程队列(公平起见,使用先进先出队列),因而先定义一个node对象用以构成这个队列:

 

    protected static class node {
        
volatile thread owner;
        
volatile node prev;
        
volatile node next;
        
        
public node(thread owner) {
            
this.owner = owner;
            
this.state = init;
        }
        
        
public node() {
            
this(thread.currentthread());
        }
    }

简单起见,队列头是一个起点的placeholder,每个调用lock的线程都先将自己竞争放入这个队列尾,每个队列头后一个线程(node)即是获得锁的线程,所以我们需要有head node字段用以快速获取队列头的后一个node,而tail node字段用来快速插入新的node,所以关键在于如何线程安全的构建这个队列,方法还是一样的,使用cas操作,即cas方法将自己设置成tail值,然后重新构建这个列表:

    protected boolean enqueue(node node) {
        
while (true) {
            
final node pretail = tail;
            node.prev 
= pretail;
            
if (compareandsettail(pretail, node)) {
                pretail.next 
= node;
                
return node.prev == head;
            }
        }
    }

在当前线程node以线程安全的方式放入这个队列后,lock实现相对就比较简单了,如果当前node是的前驱是head,该线程获得锁,否则park当前线程,处理park无理由返回的问题,因而将park放入while循环中(该实现是一个不可重入的实现):

    public void lock() {
        
// put the latest node to a queue first, then check if the it is the first node
        
// this way, the list is the only shared resource to deal with
        node node = new node();
        
if (enqueue(node)) {
            current 
= node.owner;
        } 
else {
            
while (node.prev != head) {
                locksupport.park(
this); // this may return "spuriously"!!, so put it to while
            }

            current 
= node.owner;
        }
    }

unlock的实现需要考虑多种情况,如果当前node(head.next)有后驱,那么直接unpark该后驱即可;如果没有,表示当前已经没有其他线程在等待队列中,然而在这个判断过程中可能会有其他线程进入,因而需要用cas的方式设置tail,如果设置失败,表示此时有其他线程进入,因而需要将该新进入的线程unpark从而该新进入的线程在调用park后可以立即返回(这里的cas和enqueue的cas都是对tail操作,因而能保证状态一致):

    public void unlock() {
        node curnode 
= unlockvalidate();
        node next 
= curnode.next;
        
if (next != null) {
           
head.next = next;
            next.prev 
= head;
            locksupport.unpark(next.owner);
        } 
else {
            
if (!compareandsettail(curnode, head)) {
               
while (curnode.next == null) { } // wait until the next available
                // another node queued during the time, so we have to unlock that, or else, this node can never unparked
                unlock();
            } 
else {
               
compareandsetnext(head, curnode, null); // still use cas here as the head.next may already been changed
            }
        }
    }

具体的代码和测试类可以参考查看。


其实直到自己写完这个类后才直到者其实这是一个mcs锁的变种,因而这个实现每个线程park在自身对应的node上,而由前一个线程unpark它;而abstractqueuedsynchronizer是clh锁,因为它的park由前驱状态决定,虽然它也是由前一个线程unpark它。具体可以参考。

posted on 2015-08-11 06:08 dlevin 阅读(5369) 评论(0)  编辑  收藏 所属分类: codetoolsmultithreading

只有注册用户后才能发表评论。


网站导航:
              
 
网站地图