深入浅出 java concurrency (21): 并发容器 part 6 可阻塞的blockingqueue (1) -凯发k8网页登录

关注后端架构、中间件、分布式和并发编程

   :: 凯发k8网页登录首页 :: 新随笔 :: 联系 :: 聚合  :: 管理 ::
  111 随笔 :: 10 文章 :: 2680 评论 :: 0 trackbacks

在《并发容器 part 4 并发队列与queue简介》节中的类图中可以看到,对于queue来说,blockingqueue是主要的线程安全版本。这是一个可阻塞的版本,也就是允许添加/删除元素被阻塞,直到成功为止。

blockingqueue相对于queue而言增加了两个操作:put/take。下面是一张整理的表格。

看似简单的api,非常有用。这在控制队列的并发上非常有好处。既然加入队列和移除队列能够被阻塞,这在实现生产者-消费者模型上就简单多了。

清单1 是生产者-消费者模型的一个例子。这个例子是一个真实的场景。服务端(ice服务)接受客户端的请求(accept),请求计算此人的好友生日,然后将计算的结果存取缓存中(memcache)中。在这个例子中采用了executorservice实现多线程的功能,尽可能的提高吞吐量,这个在后面线程池的部分会详细说明。目前就可以理解为new thread(r).start()就可以了。另外这里阻塞队列使用的是linkedblockingqueue。

清单1 一个生产者-消费者例子

package xylz.study.concurrency;

import java.util.concurrent.blockingqueue;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;
import java.util.concurrent.linkedblockingdeque;

public class birthdayservice {

    final int workernumber;

    final worker[] workers;

    final executorservice threadpool;

    static volatile boolean running = true;

    public birthdayservice(int workernumber, int capacity) {
        if (workernumber <= 0) throw new illegalargumentexception();
        this.workernumber = workernumber;
        workers = new worker[workernumber];
        for (int i = 0; i < workernumber; i ) {
            workers[i] = new worker(capacity);
        }
        //
        boolean b = running;// kill the resorting
        threadpool = executors.newfixedthreadpool(workernumber);
        for (worker w : workers) {
            threadpool.submit(w);
        }
    }

    worker getworker(int id) {
        return workers[id % workernumber];

    }

    class worker implements runnable {

        final blockingqueue queue;

        public worker(int capacity) {
            queue = new linkedblockingqueue(capacity);
        }

        public void run() {
            while (true) {
                try {
                    consume(queue.take());
                } catch (interruptedexception e) {
                    return;
                }
            }
        }

        void put(int id) {
            try {
                queue.put(id);
            } catch (interruptedexception e) {
                return;
            }
        }
    }

    public void accept(int id) {
        //accept client request
        getworker(id).put(id);
    }

    protected void consume(int id) {
        //do the work
        //get the list of friends and save the birthday to cache
    }
}

 

在清单1 中可以看到不管是put()还是get(),都抛出了一个interruptedexception。我们就从这里开始,为什么会抛出这个异常。

 

上一节中提到实现一个并发队列有三种方式。显然只有第二种 lock 才能实现阻塞队列。在锁机制中提到过,lock结合condition就可以实现线程的阻塞,这在锁机制部分的很多工具中都详细介绍过,而接下来要介绍的linkedblockingqueue就是采用这种方式。

 

linkedblockingqueue 原理

 

对比concurrentlinkedqueue的结构图,linkedblockingqueue多了两个reentrantlock和两个condition以及用于计数的atomicinteger,显然这会导致linkedblockingqueue的实现有点复杂。对照此结构,有以下几点说明:

    1. 但是整体上讲,linkedblockingqueue和concurrentlinkedqueue的结构类似,都是采用头尾节点,每个节点指向下一个节点的结构,这表示它们在操作上应该类似。
    2. linkedblockingqueue引入了原子计数器count,这意味着获取队列大小size()已经是常量时间了,不再需要遍历队列。每次队列长度有变更时只需要修改count即可。
    3. 有了修改node指向有了锁,所以不需要volatile特性了。既然有了锁node的item为什么需要volatile在后面会详细分析,暂且不表。
    4. 引入了两个锁,一个入队列锁,一个出队列锁。当然同时有一个队列不满的condition和一个队列不空的condition。其实参照锁机制前面介绍过的生产者-消费者模型就知道,入队列就代表生产者,出队列就代表消费者。为什么需要两个锁?一个锁行不行?其实一个锁完全可以,但是一个锁意味着入队列和出队列同时只能有一个在进行,另一个必须等待其释放锁。而从concurrentlinkedqueue的实现原理来看,事实上head和last (concurrentlinkedqueue中是tail)是分离的,互相独立的,这意味着入队列实际上是不会修改出队列的数据的,同时出队列也不会修改入队列,也就是说这两个操作是互不干扰的。更通俗的将,这个锁相当于两个写入锁,入队列是一种写操作,操作head,出队列是一种写操作,操作tail。可见它们是无关的。但是并非完全无关,后面详细分析。

 

在没有揭示入队列和出队列过程前,暂且猜测下实现原理。

根据前面学到的锁机制原理结合concurrentlinkedqueue的原理,入队列的阻塞过程大概是这样的:

    1. 获取入队列的锁putlock,检测队列大小,如果队列已满,那么就挂起线程,等待队列不满信号notfull的唤醒。
    2. 将元素加入到队列尾部,同时修改队列尾部引用last。
    3. 队列大小加1。
    4. 释放锁putlock。
    5. 唤醒notempty线程(如果有挂起的出队列线程),告诉消费者,已经有了新的产品。

 

对比入队列,出队列的阻塞过程大概是这样的:

    1. 获取出队列的锁takelock,检测队列大小,如果队列为空,那么就挂起线程,等待队列不为空notempty的唤醒。
    2. 将元素从头部移除,同时修改队列头部引用head。
    3. 队列大小减1。
    4. 释放锁takelock。
    5. 唤醒notfull线程(如果有挂起的入队列线程),告诉生产者,现在还有空闲的空间。

下面来验证上面的过程。

 

入队列过程(put/offer)

 

清单2 阻塞的入队列过程

public void put(e e) throws interruptedexception {
    if (e == null) throw new nullpointerexception();
    int c = -1;
    final reentrantlock putlock = this.putlock;
    final atomicinteger count = this.count;
    putlock.lockinterruptibly();
    try {
        try {
            while (count.get() == capacity)
                notfull.await();
        } catch (interruptedexception ie) {
            notfull.signal(); // propagate to a non-interrupted thread
            throw ie;
        }
        insert(e);
        c = count.getandincrement();
        if (c 1 < capacity)
            notfull.signal();
    } finally {
        putlock.unlock();
    }
    if (c == 0)
        signalnotempty();
}

清单2 描述的是入队列的阻塞过程。可以看到和上面描述的入队列的过程基本相同。但是也有以下几个问题:

    1. 如果在入队列的时候线程被中断,那么就需要发出一个notfull的信号,表示下一个入队列的线程能够被唤醒(如果阻塞的话)。
    2. 入队列成功后如果队列不满需要补一个notfull的信号。为什么?队列不满的时候其它入队列的阻塞线程难道不知道么?有可能。这是因为为了减少上下文切换的次数,每次唤醒一个线程(不管是入队列还是出队列)都是只随机唤醒一个(notify),而不是唤醒所有的(notifyall())。这会导致其它阻塞的入队列线程不能够即使处理队列不满的情况。
    3. 如果队列不为空并且可能有一个元素的话就唤醒一个出队列线程。这么做说明之前队列一定为空,因为在加入队列之后队列最多只能为1,那么说明未加入之前是0,那么就可能有被阻塞的出队列线程,所以就唤醒一个出队列线程。特别说明的是为什么使用一个临时变量c,而不用count。这是因为读取一个count的开销比读取一个临时一个变量大,而此处c又能够完成确认队列最多只有一个元素的判断。首先c默认为-1,如果加入队列后获取原子计数器的结果为0,说明之前队列为空,不可能消费(出队列),也不可能入队列,因为此时锁还在当前线程上,那么加入一个后队列就不为空了,所以就可以安全的唤醒一个消费(出对立)线程。
    4. 入队列的过程允许被中断,所以总是抛出interruptedexception 异常。

针对第2点,特别补充说明下。本来这属于锁机制中条件队列的范围,由于没有应用场景,所以当时没有提。

前面提高notifyall总是比notify更可靠,因为notify可能丢失通知,为什么不适用notifyall呢?

先解释下notify丢失通知的问题。

 

notify丢失通知问题

假设线程a因为某种条件在条件队列中等待,同时线程b因为另外一种条件在同一个条件队列中等待,也就是说线程a/b都被同一个conditon.await()挂起,但是等待的条件不同。现在假设线程b的线程被满足,线程c执行一个notify操作,此时jvm从conditon.await()的多个线程(a/b)中随机挑选一个唤醒,不幸的是唤醒了a。此时a的条件不满足,于是a继续挂起。而此时b仍然在傻傻的等待被唤醒的信号。也就是说本来给b的通知却被一个无关的线程持有了,真正需要通知的线程b却没有得到通知,而b仍然在等待一个已经发生过的通知。

如果使用notifyall,则能够避免此问题。notifyall会唤醒所有正在等待的线程,线程c发出的通知线程a同样能够收到,但是由于对于a没用,所以a继续挂起,而线程b也收到了此通知,于是线程b正常被唤醒。

 

既然notifyall能够解决单一notify丢失通知的问题,那么为什么不总是使用notifyall替换notify呢?

假设有n个线程在条件队列中等待,调用notifyall会唤醒所有线程,然后这n个线程竞争同一个锁,最多只有一个线程能够得到锁,于是其它线程又回到挂起状态。这意味每一次唤醒操作可能带来大量的上下文切换(如果n比较大的话),同时有大量的竞争锁的请求。这对于频繁的唤醒操作而言性能上可能是一种灾难。

如果说总是只有一个线程被唤醒后能够拿到锁,那么为什么不使用notify呢?所以某些情况下使用notify的性能是要高于notifyall的。

如果满足下面的条件,可以使用单一的notify取代notifyall操作:

相同的等待者,也就是说等待条件变量的线程操作相同,每一个从wait放回后执行相同的逻辑,同时一个条件变量的通知至多只能唤醒一个线程。

也就是说理论上讲在put/take中如果使用sinallall唤醒的话,那么在清单2 中的notfull.singal就是多余的。

 

出队列过程(poll/take)

 

再来看出队列过程。清单3 描述了出队列的过程。可以看到这和入队列是对称的。从这里可以看到,出队列使用的是和入队列不同的锁,所以入队列、出队列这两个操作才能并行进行。

清单3 阻塞的出队列过程

public e take() throws interruptedexception {
    e x;
    int c = -1;
    final atomicinteger count = this.count;
    final reentrantlock takelock = this.takelock;
    takelock.lockinterruptibly();
    try {
        try {
            while (count.get() == 0)
                notempty.await();
        } catch (interruptedexception ie) {
            notempty.signal(); // propagate to a non-interrupted thread
            throw ie;
        }

        x = extract();
        c = count.getanddecrement();
        if (c > 1)
            notempty.signal();
    } finally {
        takelock.unlock();
    }
    if (c == capacity)
        signalnotfull();
    return x;
}

 

为什么有异常?

 

有了入队列、出队列的过程后再来回答前面的几个问题。

为什么总是抛出interruptedexception 异常? 这是很大一块内容,其实是java对线程中断的处理问题,希望能够在系列文章的最后能够对此开辟单独的篇章来谈谈。

在锁机制里面也是总遇到,这是因为,java里面没有一种直接的方法中断一个挂起的线程,所以通常情况下等于一个处于waiting状态的线程,允许设置一个中断位,一旦线程检测到这个中断位就会从waiting状态退出,以一个interruptedexception 的异常返回。所以只要是对一个线程挂起操作都会导致interruptedexception 的可能,比如thread.sleep()、thread.join()、object.wait()。尽管locksupport.park()不会抛出一个interruptedexception 异常,但是它会将当前线程的的interrupted状态位置上,而对于lock/condition而言,当捕捉到interrupted状态后就认为线程应该终止任务,所以就抛出了一个interruptedexception 异常。

 

又见volatile

 

还有一个不容易理解的问题。为什么node.item是volatile类型的?

起初我不大明白,因为对于一个进入队列的node,它的item是不变,当且仅当出队列的时候会将头结点元素的item 设置为null。尽管在remove(o)的时候也是设置为null,但是那时候是加了putlock/takelock两个锁的,所以肯定是没有问题的。那么问题出在哪?

我们知道,item的值是在put/offer的时候加入的。这时候都是有putlock锁保证的,也就是说它保证使用putlock锁的读取肯定是没有问题的。那么问题就只可能出在一个不适用putlock却需要读取node.item的地方。

peek操作时获取头结点的元素而不移除它。显然他不会操作尾节点,所以它不需要putlock锁,也就是说它只有takelock锁。清单4 描述了这个过程。

清单4 查询队列头元素过程

public e peek() {
    if (count.get() == 0)
        return null;
    final reentrantlock takelock = this.takelock;
    takelock.lock();
    try {
        node first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takelock.unlock();
    }
}

清单4 描述了peek的过程,最后返回一个非null节点的结果是node.item。这里读取了node的item值,但是整个过程却是使用了takelock而非putlock。换句话说putlock对node.item的操作,peek()线程可能不可见!

清单5 队列尾部加入元素

private void insert(e x) {
    last = last.next = new node(x);
}

 

清单5 是入队列offer/put的一部分,这里关键在于last=new node(x)可能发生重排序。node构造函数是这样的:node(e x) { item = x; }。在这一步里面我们可能得到以下一种情况:

    1. 构建一个node对象n;
    2. 将node的n赋给last
    3. 初始化n,设置item=x

在执行步骤2 的时候一个peek线程可能拿到了新的node n,这时候它读取item,得到了一个null。显然这是不可靠的。

对item采用volatile之后,jmm保证对item=x的赋值一定在last=n之前,也就是说last得到的一个是一个已经赋值了的新节点n。这就不会导致读取空元素的问题的。

出对了poll/take和peek都是使用的takelock锁,所以不会导致此问题。

删除操作和遍历操作由于同时获取了takelock和putlock,所以也不会导致此问题。

总结:当前仅当元素加入队列时读取此元素才可能导致不一致的问题。采用volatile正式避免此问题。

 

附加功能

 

blockingqueue有一个额外的功能,允许批量从队列中异常元素。这个api是:

int drainto(collection c, int maxelements); 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。

int drainto(collection c); 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。

清单6 描述的是最多移除指定数量元素的过程。由于批量操作只需要一次获取锁,所以效率会比每次获取锁要高。但是需要说明的,需要同时获取takelock/putlock两把锁,因为当移除完所有元素后这会涉及到尾节点的修改(last节点仍然指向一个已经移走的节点)。

由于迭代操作contains()/remove()/iterator()也是获取了两个锁,所以迭代操作也是线程安全的。

 

清单6 批量移除操作

public int drainto(collection c, int maxelements) {
    if (c == null)
        throw new nullpointerexception();
    if (c == this)
        throw new illegalargumentexception();
    fullylock();
    try {
        int n = 0;
        node p = head.next;
        while (p != null && n < maxelements) {
            c.add(p.item);
            p.item = null;
            p = p.next;
            n;
        }
        if (n != 0) {
            head.next = p;
            assert head.item == null;
            if (p == null)
                last = head;
            if (count.getandadd(-n) == capacity)
                notfull.signalall();
        }
        return n;
    } finally {
        fullyunlock();
    }
}

 



©2009-2014 imxylz
|求贤若渴
posted on 2010-07-24 00:02 imxylz 阅读(19494) 评论(6)     所属分类: java concurrency
# re: 深入浅出 java concurrency (21): 并发容器 part 6 可阻塞的blockingqueue (1) 2011-02-15 10:00
blockingqueue接口四种形式操作,中文api有说明:
”blockingqueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。“
很显然,依据条件是:"不能立即满足但可能在将来某一时刻可以满足的操作"所产生的不同输出。
这样就很容易理解最上面表格所表述内容了。  回复  
  

# re: 深入浅出 java concurrency (21): 并发容器 part 6 可阻塞的blockingqueue (1) 2011-02-15 10:07 xylz
@hixiaomin
嗯,理解非常不错!  回复  
  

# re: 深入浅出 java concurrency (21): 并发容器 part 6 可阻塞的blockingqueue (1)[未登录] 2011-07-19 13:54
你的版本是不是太老了,node.item不是volatile,drainto也只用了一把takelock。其他感觉还不错。
  回复  
  

# re: 深入浅出 java concurrency (21): 并发容器 part 6 可阻塞的blockingqueue (1) 2011-08-16 20:51
@小牛犊
你的是什么版本的 我的1.6.0_21.跟lz一样  回复  
  

# re: 深入浅出 java concurrency (21): 并发容器 part 6 可阻塞的blockingqueue (1)[未登录] 2011-09-05 15:47
@yintiefu
我用的是1.6.0_24的, 有较大的改变  回复  
  

# re: 深入浅出 java concurrency (21): 并发容器 part 6 可阻塞的blockingqueue (1)[未登录] 2013-12-25 17:31
这里对于volatile的分析,觉得老主多虑了.之所以有volatile,是因为之前需要借助volatile的数据一致性,那时可能还没有使用lock加锁,但后面有了lock之后,lock之内的程序也是保证happens-before的,所以dong lea忘了把volatile拿掉,目前在1.6.0.27之后已经没有了.如果按照楼主的分析,那岂不这个类有明显的bug.  回复  
  


©2009-2014
网站地图