本篇blog将讲述coroutine的一些背景知识,以及在java中如何使用coroutine,包括一个简单的benchmark对比,希望能借助这篇blog让大家了解到更多在java中使用coroutine的方法,本篇blog的pdf版本可从此下载:
在讲到具体内容之前,不能不先讲下coroutine的一些背景知识,来先具体了解下什么是coroutine。
现在的操作系统都是支持多任务的,多任务可通过多进程或多线程的方式去实现,进程和线程的对比就不在这里说了,在多任务的调度上操作系统采取抢占式和协作式两种方式,抢占式是指操作系统给每个任务一定的执行时间片,在到达这个时间片后如任务仍然未释放对cpu的占用,那么操作系统将强制释放,这是目前多数操作系统采取的方式;协作式是指操作系统按照任务的顺序来分配cpu,每个任务执行过程中除非其主动释放,否则将一直占据cpu,这种方式非常值得注意的是一旦有任务占据cpu不放,会导致其他任务”饿死”的现象,因此操作系统确实不太适合采用这种方式。
说完操作系统多任务的调度方式后,来看看通常程序是如何实现支持高并发的,一种就是典型的基于操作系统提供的多进程或多线程机制,每个任务占据一个进程或一个线程,当任务中有io等待等动作时,则将进程或线程放入待调度队列中,这种方式是目前大多数程序采取的方式,这种方式的坏处在于如想支持高的并发量,就不得不创建很多的进程或线程,而进程和线程都是要消耗不少系统资源的,另外一方面,进程或线程创建太多后,操作系统需要花费很多的时间在进程或线程的切换上,切换动作需要做状态保持和恢复,这也会消耗掉很多的系统资源;另外一种方式则是每个任务不完全占据一个进程或线程,当任务执行过程中需要进行io等待等动作时,任务则将其所占据的进程或线程释放,以便其他任务使用这个进程或线程,这种方式的好处在于可以减少所需要的原生的进程或线程数,并且由于操作系统不需要做进程或线程的切换,而是自行来实现任务的切换,其成本会较操作系统切换低,这种方式也就是本文的重点,coroutine方式,又称协程方式,这种方式在目前的大多数语言中都有支持。
各种语言在实现coroutine方式的支持时,多数都采用了actor
model来实现,actor model简单来说就是每个任务就是一个actor,actor之间通过消息传递的方式来进行交互,而不采用共享的方式,actor可以看做是一个轻量级的进程或线程,通常在一台4g内存的机器上,创建几十万个actor是毫无问题的,actor支持continuations,即对于如下代码:
actor
act方法
进行一些处理
创建并执行另外一个actor
通过消息box阻塞获取另一个actor执行的结果
继续基于这个结果进行一些处理
在支持continuations的情况下,可以做到消息box阻塞时并不是进程或线程级的阻塞,而只是actor本身的阻塞,并且在阻塞时可将所占据的进程或线程释放给其他actor使用,actor model实现最典型的就是erlang了。
对于java应用而言,传统方式下为了支持高并发,由于一个线程只能用于处理一个请求,即使是线程中其实有很多io中断、锁等待也同样如此,因此通常的做法是通过启动很多的线程来支撑高并发,但当线程过多时,就造成了cpu需要消耗不少的时间在线程的切换上,从而出现瓶颈,按照上面对coroutine的描述,coroutine的方式理论上而言能够大幅度的提升java应用所能支撑的并发量。
java尚不能从语言层次上支持coroutine,也许java 7能够支持,目前已经有了一个测试性质的版本,在sun jdk 7尚未正式发布的情况下如希望在java中使用coroutine,scala或kilim是可以做的选择,来分别看下。
scala是现在很火的语言之一,twitter消息中间件基于scala编写更是让scala名声鹊起,除了在语法方面所做出的改进外,其中一个最突出的特色就是scala actor,scala actor是scala用于实现coroutine的方式,先来具体看看scala在coroutine支持实现的关键概念。
l
actor
scala actor可以看做是一个轻量级的java
thread,其使用方式和java thread基本也一致,继承actor,实现act方法,启动时也是调用start方法,但和java thread不同的是,scala actor可等待外部发送过来的消息,并进行相应的处理。
l
actor的消息发送机制
发送消息到actor的方式有异步、future两种方式,异步即指发送后立即返回,继续后续流程,使用异步发送的方法为:actor ! messageobject,其中消息对象可以为任何类型,并且scala还支持一种称为case object的对象,便于在收到消息时做pattern matching。
future方式是指阻塞线程等待消息处理的结果,使用future方式发送的方法为:actor !! messageobject,在等待结果方面,scala支持不限时等待,限时等待以及等待多个future或个别future完成,使用方法如下:
val ft=actor !!
messageobject // future方式发送消息
val result=ft()
// 不限时等待
val
results=awaitall(500,ft1,ft2,ft3) // 限时等待多个future返回值
val results=awaiteither(ft1,ft2)
// 等待个别future完成
接收消息方通过reply方法返回future方式所等待的结果。
l
actor的消息接收机制
当代码处于actor的act方法或actor环境(例如为actor的act方法调用过来的代码)中时,可通过以下两种方式来接收外部发送给actor的消息:一为receive方式,二为react方式,代码例子如下:
receive{
case messageobject(args) =>
dohandle(args)
}
react{
case messageobject(args) =>
dohandle(args)
}
receive和react的差别在于receive需要阻塞当前java线程,react则仅为阻塞当前actor,但并不会阻塞java线程,因此react模式更适合于充分发挥coroutine带来的原生线程数减少的好处,但react模式有个缺点是react不支持返回。
receive和react都有限时接收的方式,方法为:receivewithin(timeout)、reactwithin(timeout),超时的消息通过case timeout的方式来接收。
下面来看基于scala actor实现并发处理请求的一个简单例子。
class
processor extends actor{
def
act(){
loop{
react{
case
command:string => dohandle(command)
}
}
}
def
dohandle(command:string){
//
业务逻辑处理
}
}
当需要并发执行此processor时,在处理时需要的仅为调用以下代码:
val
processor=new processor()
processor.start
processor ! “hello”
从以上说明来看,要在旧的应用中使用scala还是会有一些成本,部署运行则非常简单,在scala ide plugin编写了上面的scala代码后,即生成了java class文件,可直接在jvm中运行。
kilim是由剑桥的两位博士开发的一个用于在java中使用coroutine的框架,kilim基于java语法,先来看看kilim中的关键概念。
l
task
可以认为task就是actor,使用方式和java
thread基本相同,只是继承的为task,覆盖的为execute方法,启动也是调用task的start方法。
l
task的消息发送机制
kilim中通过mailbox对象来发送消息,mailbox的基本原则为可以有多个消息发送者,但只能有一个消息接收者,发送的方式有同步发送、异步发送和阻塞线程方式的同步发送三种,同步发送是指保证一定能将消息放入发送队列中,如当前发送队列已满,则等待到可用为止,阻塞的为当前task;异步发送则是尝试将消息放入发送队列一次,如失败,则返回false,成功则返回true,不会阻塞task;阻塞线程方式的同步发送是指阻塞当前线程,并保证将消息发送给接收者,三种方式的使用方法如下:
mailbox.put(messageobject);
// 同步发送
mailbox.putnb(messageobject);
// 异步发送
mailbox.putb(messageobject);
// 阻塞线程方式发送
l
task的消息接收机制
kilim中通过mailbox来接收消息,接收消息的方式有同步接收、异步接收以及阻塞线程方式的同步接收三种,同步接收是指阻塞当前task,直到接收到消息才返回;异步接收是指立刻返回mailbox中的消息,有就返回,没有则返回null;阻塞线程方式的同步接收是指阻塞当前线程,直到接收到消息才返回,使用方法如下:
mailbox.get();
// 同步接收,传入long参数表示等待的超时时间,单位为毫秒
mailbox.getnb();
// 异步接收,立刻返回
mailbox.getb();
// 阻塞线程方式接收
下面来看基于kilim实现并发处理请求的一个简单例子。
public
class processor extends task{
private
string command;
public
processor(string command){
this.command=command;
}
public void execute() throws pausable,exception{
// 业务逻辑处理
}
}
在处理时,仅需调用以下代码:
task
processor=new processor(command);
processor.start();
从以上代码来看,kilim对于java人员而言学习门槛更低,但对于需要采用coroutine方式执行的代码在编译完毕后,还需要采用kilim的kilim.tools.weaver类来对这些已编译出来的class文件做织入,运行时需要用织入后生成的class文件才行,织入的方法为:java kilim.tools.weaver –d [织入后生成的class文件存放的目录] [需要织入的类文件所在的目录],目前尚没有kilim ide
plugin可用,因此weaver这个过程还是比较的麻烦。
上面对scala和kilim做了一个简单的介绍,在实际java应用中使用coroutine时,通常会出现以下几种典型的更复杂的使用场景,由于actor模式本身就是异步的,因此其天然对异步场景支持的就非常好,更多的问题会出现在以下几个同步场景上,分别来看看基于scala、kilim如何来实现。
l
actor同步调用
actor同步调用是经常会出现的使用场景,主要为actor发送消息给其他的actor处理,并等待结果才能继续。
n
scala
对于这种情况,在scala 2.7.7中,目前可采取的为以下两种方法:
一种为通过future方式发送消息来实现:
class
processor(command:string) extends actor{
def act(){
val actor=new
netsenderactor()
val ft=actor !! command
println(ft())
}
}
class
netsenderactor extends actor{
def act(){
case command:string => {
reply(“received
command:” command)
}
}
}
第二种为通过receive的方式来实现:
class
processor(command:string) extends actor{
def act(){
val actor=new
netsenderactor()
actor ! command
var senderresult=””
receive{
case result:string => {
senderresult=result
}
}
println(senderresult)
}
}
class
netsenderactor extends actor{
def act(){
case command:string => {
sender ! (“received
command:” command)
}
}
}
但这两种方式其实都不好,因为这两种方式都会造成当前actor的线程阻塞,这也是因为目前scala版本对continuations尚不支持的原因,scala 2.8版本将提供continuations的支持,希望到时能有不需要阻塞actor线程实现上述需求的方法。
还有一种常见的场景是actor调一段普通的scala类,然后那个类中进行了一些处理,并调用了其他actor,此时在该类中如需要等待actor的返回结果,也可使用上面两种方法。
n
kilim
在kilim中要实现task之间的同步调用非常简单,代码如下:
public class
taska extends task{
public void execute() throws
pausable,exception{
mailbox