在java中使用协程(coroutine)

本篇blog将讲述coroutine的一些背景知识,以及在java中如何使用coroutine,包括一个简单的benchmark对比,希望能借助这篇blog让大家了解到更多在java中使用coroutine的方法,本篇blog的pdf版本可从此下载:
在讲到具体内容之前,不能不先讲下
coroutine的一些背景知识,来先具体了解下什么是coroutine

         现在的操作系统都是支持多任务的,多任务可通过多进程或多线程的方式去实现,进程和线程的对比就不在这里说了,在多任务的调度上操作系统采取抢占式和协作式两种方式,抢占式是指操作系统给每个任务一定的执行时间片,在到达这个时间片后如任务仍然未释放对cpu的占用,那么操作系统将强制释放,这是目前多数操作系统采取的方式;协作式是指操作系统按照任务的顺序来分配cpu,每个任务执行过程中除非其主动释放,否则将一直占据cpu,这种方式非常值得注意的是一旦有任务占据cpu不放,会导致其他任务饿死的现象,因此操作系统确实不太适合采用这种方式。

         说完操作系统多任务的调度方式后,来看看通常程序是如何实现支持高并发的,一种就是典型的基于操作系统提供的多进程或多线程机制,每个任务占据一个进程或一个线程,当任务中有io等待等动作时,则将进程或线程放入待调度队列中,这种方式是目前大多数程序采取的方式,这种方式的坏处在于如想支持高的并发量,就不得不创建很多的进程或线程,而进程和线程都是要消耗不少系统资源的,另外一方面,进程或线程创建太多后,操作系统需要花费很多的时间在进程或线程的切换上,切换动作需要做状态保持和恢复,这也会消耗掉很多的系统资源;另外一种方式则是每个任务不完全占据一个进程或线程,当任务执行过程中需要进行io等待等动作时,任务则将其所占据的进程或线程释放,以便其他任务使用这个进程或线程,这种方式的好处在于可以减少所需要的原生的进程或线程数,并且由于操作系统不需要做进程或线程的切换,而是自行来实现任务的切换,其成本会较操作系统切换低,这种方式也就是本文的重点,coroutine方式,又称协程方式,这种方式在目前的大多数语言中都有支持。

         各种语言在实现coroutine方式的支持时,多数都采用了actor model来实现,actor model简单来说就是每个任务就是一个actoractor之间通过消息传递的方式来进行交互,而不采用共享的方式,actor可以看做是一个轻量级的进程或线程,通常在一台4g内存的机器上,创建几十万个actor是毫无问题的,actor支持continuations,即对于如下代码:

         actor

                   act方法

                            进行一些处理

创建并执行另外一个actor

                            通过消息box阻塞获取另一个actor执行的结果

                            继续基于这个结果进行一些处理

         在支持continuations的情况下,可以做到消息box阻塞时并不是进程或线程级的阻塞,而只是actor本身的阻塞,并且在阻塞时可将所占据的进程或线程释放给其他actor使用,actor model实现最典型的就是erlang了。

         对于java应用而言,传统方式下为了支持高并发,由于一个线程只能用于处理一个请求,即使是线程中其实有很多io中断、锁等待也同样如此,因此通常的做法是通过启动很多的线程来支撑高并发,但当线程过多时,就造成了cpu需要消耗不少的时间在线程的切换上,从而出现瓶颈,按照上面对coroutine的描述,coroutine的方式理论上而言能够大幅度的提升java应用所能支撑的并发量。

         java尚不能从语言层次上支持coroutine,也许java 7能够支持,目前已经有了一个测试性质的版本,在sun jdk 7尚未正式发布的情况下如希望在java中使用coroutinescalakilim是可以做的选择,来分别看下。

         scala是现在很火的语言之一,twitter消息中间件基于scala编写更是让scala名声鹊起,除了在语法方面所做出的改进外,其中一个最突出的特色就是scala actorscala actorscala用于实现coroutine的方式,先来具体看看scalacoroutine支持实现的关键概念。

l  actor

scala actor可以看做是一个轻量级的java thread,其使用方式和java thread基本也一致,继承actor,实现act方法,启动时也是调用start方法,但和java thread不同的是,scala actor可等待外部发送过来的消息,并进行相应的处理。

l  actor的消息发送机制

发送消息到actor的方式有异步、future两种方式,异步即指发送后立即返回,继续后续流程,使用异步发送的方法为:actor ! messageobject,其中消息对象可以为任何类型,并且scala还支持一种称为case object的对象,便于在收到消息时做pattern matching

future方式是指阻塞线程等待消息处理的结果,使用future方式发送的方法为:actor !! messageobject,在等待结果方面,scala支持不限时等待,限时等待以及等待多个future或个别future完成,使用方法如下:

val ft=actor !! messageobject // future方式发送消息

val result=ft() // 不限时等待

val results=awaitall(500,ft1,ft2,ft3)  // 限时等待多个future返回值

val results=awaiteither(ft1,ft2) // 等待个别future完成

接收消息方通过reply方法返回future方式所等待的结果。

l  actor的消息接收机制

当代码处于actoract方法或actor环境(例如为actoract方法调用过来的代码)中时,可通过以下两种方式来接收外部发送给actor的消息:一为receive方式,二为react方式,代码例子如下:

receive{

         case messageobject(args) => dohandle(args)

}

react{

         case messageobject(args) => dohandle(args)

}

receivereact的差别在于receive需要阻塞当前java线程,react则仅为阻塞当前actor,但并不会阻塞java线程,因此react模式更适合于充分发挥coroutine带来的原生线程数减少的好处,但react模式有个缺点是react不支持返回。

receivereact都有限时接收的方式,方法为:receivewithin(timeout)reactwithin(timeout),超时的消息通过case timeout的方式来接收。

下面来看基于scala actor实现并发处理请求的一个简单例子。

         class processor extends actor{

                   def act(){

                            loop{

                                     react{

                                               case command:string => dohandle(command)

}

}

                   }

 

                   def dohandle(command:string){

                            // 业务逻辑处理

}

}

当需要并发执行此processor时,在处理时需要的仅为调用以下代码:

val processor=new processor()

processor.start

processor ! “hello”

         从以上说明来看,要在旧的应用中使用scala还是会有一些成本,部署运行则非常简单,在scala ide plugin编写了上面的scala代码后,即生成了java class文件,可直接在jvm中运行。

kilim是由剑桥的两位博士开发的一个用于在java中使用coroutine的框架,kilim基于java语法,先来看看kilim中的关键概念。

l  task

可以认为task就是actor,使用方式和java thread基本相同,只是继承的为task,覆盖的为execute方法,启动也是调用taskstart方法。

l  task的消息发送机制

kilim中通过mailbox对象来发送消息,mailbox的基本原则为可以有多个消息发送者,但只能有一个消息接收者,发送的方式有同步发送、异步发送和阻塞线程方式的同步发送三种,同步发送是指保证一定能将消息放入发送队列中,如当前发送队列已满,则等待到可用为止,阻塞的为当前task;异步发送则是尝试将消息放入发送队列一次,如失败,则返回false,成功则返回true,不会阻塞task;阻塞线程方式的同步发送是指阻塞当前线程,并保证将消息发送给接收者,三种方式的使用方法如下:

mailbox.put(messageobject); // 同步发送

mailbox.putnb(messageobject); // 异步发送

mailbox.putb(messageobject); // 阻塞线程方式发送

l  task的消息接收机制

kilim中通过mailbox来接收消息,接收消息的方式有同步接收、异步接收以及阻塞线程方式的同步接收三种,同步接收是指阻塞当前task,直到接收到消息才返回;异步接收是指立刻返回mailbox中的消息,有就返回,没有则返回null;阻塞线程方式的同步接收是指阻塞当前线程,直到接收到消息才返回,使用方法如下:

mailbox.get(); // 同步接收,传入long参数表示等待的超时时间,单位为毫秒

mailbox.getnb(); // 异步接收,立刻返回

mailbox.getb(); // 阻塞线程方式接收

下面来看基于kilim实现并发处理请求的一个简单例子。

         public class processor extends task{

                   private string command;

                   public processor(string command){

                            this.command=command;

}

public void execute() throws pausable,exception{

         // 业务逻辑处理

}

}

在处理时,仅需调用以下代码:

task processor=new processor(command);

processor.start();

从以上代码来看,kilim对于java人员而言学习门槛更低,但对于需要采用coroutine方式执行的代码在编译完毕后,还需要采用kilimkilim.tools.weaver类来对这些已编译出来的class文件做织入,运行时需要用织入后生成的class文件才行,织入的方法为:java kilim.tools.weaver –d [织入后生成的class文件存放的目录] [需要织入的类文件所在的目录],目前尚没有kilim ide plugin可用,因此weaver这个过程还是比较的麻烦。

上面对scalakilim做了一个简单的介绍,在实际java应用中使用coroutine时,通常会出现以下几种典型的更复杂的使用场景,由于actor模式本身就是异步的,因此其天然对异步场景支持的就非常好,更多的问题会出现在以下几个同步场景上,分别来看看基于scalakilim如何来实现。

l  actor同步调用

actor同步调用是经常会出现的使用场景,主要为actor发送消息给其他的actor处理,并等待结果才能继续。

n  scala

对于这种情况,在scala 2.7.7中,目前可采取的为以下两种方法:

一种为通过future方式发送消息来实现:

class processor(command:string) extends actor{

         def act(){

                   val actor=new netsenderactor()

                   val ft=actor !! command

                   println(ft())

}

}

class netsenderactor extends actor{

         def act(){

                   case command:string => {

                            reply(“received command:” command)

}

}

}

第二种为通过receive的方式来实现:

class processor(command:string) extends actor{

         def act(){

                   val actor=new netsenderactor()

                   actor ! command

                   var senderresult=””

receive{

                            case result:string => {

senderresult=result

}

}

println(senderresult)

}

}

class netsenderactor extends actor{

         def act(){

                   case command:string => {

                            sender ! (“received command:” command)

}

}

}

但这两种方式其实都不好,因为这两种方式都会造成当前actor的线程阻塞,这也是因为目前scala版本对continuations尚不支持的原因,scala 2.8版本将提供continuations的支持,希望到时能有不需要阻塞actor线程实现上述需求的方法。

还有一种常见的场景是actor调一段普通的scala类,然后那个类中进行了一些处理,并调用了其他actor,此时在该类中如需要等待actor的返回结果,也可使用上面两种方法。

n  kilim

kilim中要实现task之间的同步调用非常简单,代码如下:

public class taska extends task{

         public void execute() throws pausable,exception{

                   mailbox

网站地图