前记
很早以前就有读netty源码的打算了,然而第一次尝试的时候从netty4开始,一直抓不到核心的框架流程,后来因为其他事情忙着就放下了。这次趁着休假重新捡起这个硬骨头,因为netty3现在还在被很多项目使用,因而这次决定先从netty3入手,瞬间发现netty3的代码比netty4中规中矩的多,很多概念在代码本身中都有清晰的表达,所以半天就把整个框架的骨架搞清楚了。再读,回去读netty4的源码,反而觉得轻松了,一种豁然开朗的感觉。
记得去年读jetty源码的时候,因为代码太庞大,并且自己的http server的了解太少,因而只能自底向上的一个一个模块的叠加,直到最后把所以的模块连接在一起而看清它的真正核心骨架。现在读源码,开始习惯先把骨架理清,然后延伸到不同的器官、血肉而看清整个人体。
本文从reactor模式在netty3中的应用,引出netty3的整体架构以及控制流程;然而除了reactor模式,netty3还在channelpipeline中使用了
intercepting filter模式,这个模式也在servlet的filter中成功使用,因而本文还会从intercepting filter模式出发详细介绍channelpipeline的设计理念。本文假设读者已经对netty有一定的了解,因而不会包含过多入门介绍,以及帮netty做宣传的文字。
netty3中的reactor模式
reactor模式在netty中应用非常成功,因而它也是在netty中受大肆宣传的模式,关于reactor模式可以详细参考本人的另一篇文章
《reactor模式详解》,对reactor模式的实现是netty3的基本骨架,因而本小节会详细介绍reactor模式如何应用netty3中。
如果读《reactor模式详解》,我们知道reactor模式由handle、synchronous event demultiplexer、initiation dispatcher、event handler、concrete event handler构成,在java的实现版本中,channel对应handle,selector对应synchronous event demultiplexer,并且netty3还使用了两层reactor:main reactor用于处理client的连接请求,sub reactor用于处理和client连接后的读写请求(关于这个概念还可以参考doug lea的这篇ppt:)。所以我们先要解决netty3中使用什么类实现所有的上述模块并把他们联系在一起的,以nio实现方式为例:
模式是一种抽象,但是在实现中,经常会因为语言特性、框架和性能需要而做一些改变,因而netty3对reactor模式的实现有一套自己的设计:
1. channelevent:reactor是基于事件编程的,因而在netty3中使用channelevent抽象的表达netty3内部可以产生的各种事件,所有这些事件对象在channels帮助类中产生,并且由它将事件推入到channelpipeline中,channelpipeline构建channelhandler管道,channelevent流经这个管道实现所有的业务逻辑处理。channelevent对应的事件有:channelstateevent表示channel状态的变化事件,而如果当前channel存在parent channel,则该事件还会传递到parent channel的channelpipeline中,如open、bound、connected、interest_ops等,该事件可以在各种不同实现的channel、channelsink中产生;messageevent表示从socket中读取数据完成、需要向socket写数据或channelhandler对当前message解析(如decoder、encoder)后触发的事件,它由nioworker、需要对message做进一步处理的channelhandler产生;writecompletionevent表示写完成而触发的事件,它由nioworker产生;exceptionevent表示在处理过程中出现的exception,它可以发生在各个构件中,如channel、channelsink、nioworker、channelhandler中;idlestateevent由idlestatehandler触发,这也是一个channelevent可以无缝扩展的例子。注:在netty4后,已经没有channelevent类,所有不同事件都用对应方法表达,这也意味这channelevent不可扩展,netty4采用在channelinboundhandler中加入usereventtriggered()方法来实现这种扩展,具体可以参考。
2. channelhandler:在netty3中,channelhandler用于表示reactor模式中的eventhandler。channelhandler只是一个标记接口,它有两个子接口:channeldownstreamhandler和channelupstreamhandler,其中channeldownstreamhandler表示从用户应用程序流向netty3内部直到向socket写数据的管道,在netty4中改名为channeloutboundhandler;channelupstreamhandler表示数据从socket进入netty3内部向用户应用程序做数据处理的管道,在netty4中改名为channelinboundhandler。
3. channelpipeline:用于管理channelhandler的管道,每个channel一个channelpipeline实例,可以运行过程中动态的向这个管道中添加、删除channelhandler(由于实现的限制,在最末端的channelhandler向后添加或删除channelhandler不一定在当前执行流程中起效,参考)。channelpipeline内部维护一个channelhandler的双向链表,它以upstream(inbound)方向为正向,downstream(outbound)方向为方向。channelpipeline采用intercepting filter模式实现,具体可以参考
这里,这个模式的实现在后一节中还是详细介绍。
4. nioselector:netty3使用nioselector来存放selector(synchronous event demultiplexer),每个新产生的nio channel都向这个selector注册自己以让这个selector监听这个nio channel中发生的事件,当事件发生时,调用帮助类channels中的方法生成channelevent实例,将该事件发送到这个netty channel对应的channelpipeline中,而交给各级channelhandler处理。其中在向selector注册nio channel时,netty channel实例以attachment的形式传入,该netty channel在其内部的nio channel事件发生时,会以attachment的形式存在于selectionkey中,因而每个事件可以直接从这个attachment中获取相关链的netty channel,并从netty channel中获取与之相关联的channelpipeline,这个实现和doug lea的一模一样。另外netty3还采用了中相同的main reactor和sub reactor设计,其中nioselector的两个实现:boss即为main reactor,nioworker为sub reactor。boss用来处理新连接加入的事件,nioworker用来处理各个连接对socket的读写事件,其中boss通过nioworkerpool获取nioworker实例,netty3模式使用roundrobin方式放回nioworker实例。更形象一点的,可以通过的这张图表达:
若与ractor模式对应,nioselector中包含了synchronous event demultiplexer,而channelpipeline中管理着所有eventhandler,因而nioselector和channelpipeline共同构成了initiation dispatcher。
5. channelsink:在channelhandler处理完成所有逻辑需要向客户端写响应数据时,一般会调用netty channel中的write方法,然而在这个write方法实现中,它不是直接向其内部的socket写数据,而是交给channels帮助类,内部创建downstreammessageevent,反向从channelpipeline的管道中流过去,直到第一个channelhandler处理完毕,最后交给channelsink处理,以避免阻塞写而影响程序的吞吐量。channelsink将这个messageevent提交给netty channel中的writebufferqueue,最后nioworker会等到这个nio channel已经可以处理写事件时无阻塞的向这个nio channel写数据。这就是上图的send是从subreactor直接出发的原因。
6. channel:netty有自己的channel抽象,它是一个资源的容器,包含了所有一个连接涉及到的所有资源的饮用,如封装nio channel、channelpipeline、boss、nioworkerpool等。另外它还提供了向内部nio channel写响应数据的接口write、连接/绑定到某个地址的connect/bind接口等,个人感觉虽然对channel本身来说,因为它封装了nio channel,因而这些接口定义在这里是合理的,但是如果考虑到netty的架构,它的channel只是一个资源容器,有这个channel实例就可以得到和它相关的基本所有资源,因而这种write、connect、bind动作不应该再由它负责,而是应该由其他类来负责,比如在netty4中就在channelhandlercontext添加了write方法,虽然netty4并没有删除channel中的write接口。
netty3中的intercepting filter模式
如果说reactor模式是netty3的骨架,那么intercepting filter模式则是netty的中枢。reactor模式主要应用在netty3的内部实现,它是netty3具有良好性能的基础,而intercepting filter模式则是channelhandler组合实现一个应用程序逻辑的基础,只有很好的理解了这个模式才能使用好netty,甚至能得心应手。
关于intercepting filter模式的详细介绍可以参考
这里,本节主要介绍netty3中对intercepting filter模式的实现,其实就是defaultchannelpipeline对intercepting filter模式的实现。在上文有提到netty3的channelpipeline是channelhandler的容器,用于存储与管理channelhandler,同时它在netty3中也起到桥梁的作用,即它是连接netty3内部到所有channelhandler的桥梁。作为channelpipeline的实现者defaultchannelpipeline,它使用一个channelhandler的双向链表来存储,以defaultchannelpipelinecontext作为节点:
public interface channelhandlercontext {
channel getchannel();
channelpipeline getpipeline();
string getname();
channelhandler gethandler();
boolean canhandleupstream();
boolean canhandledownstream();
void sendupstream(channelevent e);
void senddownstream(channelevent e);
object getattachment();
void setattachment(object attachment);
}
private final class defaultchannelhandlercontext implements channelhandlercontext {
volatile defaultchannelhandlercontext next;
volatile defaultchannelhandlercontext prev;
private final string name;
private final channelhandler handler;
private final boolean canhandleupstream;
private final boolean canhandledownstream;
private volatile object attachment;
.....
}
在defaultchannelpipeline中,它存储了和当前channelpipeline相关联的channel、channelsink以及channelhandler链表的head、tail,所有channelevent通过sendupstream、senddownstream为入口流经整个链表:
public class defaultchannelpipeline implements channelpipeline {
private volatile channel channel;
private volatile channelsink sink;
private volatile defaultchannelhandlercontext head;
private volatile defaultchannelhandlercontext tail;
......
public void sendupstream(channelevent e) {
defaultchannelhandlercontext head = getactualupstreamcontext(this.head);
if (head == null) {
return;
}
sendupstream(head, e);
}
void sendupstream(defaultchannelhandlercontext ctx, channelevent e) {
try {
((channelupstreamhandler) ctx.gethandler()).handleupstream(ctx, e);
} catch (throwable t) {
notifyhandlerexception(e, t);
}
}
public void senddownstream(channelevent e) {
defaultchannelhandlercontext tail = getactualdownstreamcontext(this.tail);
if (tail == null) {
try {
getsink().eventsunk(this, e);
return;
} catch (throwable t) {
notifyhandlerexception(e, t);
return;
}
}
senddownstream(tail, e);
}
void senddownstream(defaultchannelhandlercontext ctx, channelevent e) {
if (e instanceof upstreammessageevent) {
throw new illegalargumentexception("cannot send an upstream event to downstream");
}
try {
((channeldownstreamhandler) ctx.gethandler()).handledownstream(ctx, e);
} catch (throwable t) {
e.getfuture().setfailure(t);
notifyhandlerexception(e, t);
}
}
对upstream事件,向后找到所有实现了channelupstreamhandler接口的channelhandler组成链(
getactualupstreamcontext()),而对downstream事件,向前找到所有实现了channeldownstreamhandler接口的channelhandler组成链(
getactualdownstreamcontext()):
private defaultchannelhandlercontext getactualupstreamcontext(defaultchannelhandlercontext ctx) {
if (ctx == null) {
return null;
}
defaultchannelhandlercontext realctx = ctx;
while (!realctx.canhandleupstream()) {
realctx = realctx.next;
if (realctx == null) {
return null;
}
}
return realctx;
}
private defaultchannelhandlercontext getactualdownstreamcontext(defaultchannelhandlercontext ctx) {
if (ctx == null) {
return null;
}
defaultchannelhandlercontext realctx = ctx;
while (!realctx.canhandledownstream()) {
realctx = realctx.prev;
if (realctx == null) {
return null;
}
}
return realctx;
}
在实际实现channelupstreamhandler或channeldownstreamhandler时,调用
channelhandlercontext中的sendupstream或senddownstream方法将控制流程交给下一个
channelupstreamhandler或下一个channeldownstreamhandler,或调用channel中的write方法发送
响应消息。
public class mychannelupstreamhandler implements channelupstreamhandler {
public void handleupstream(channelhandlercontext ctx, channelevent e) throws exception {
// handle current logic, use channel to write response if needed.
// ctx.getchannel().write(message);
ctx.sendupstream(e);
}
}
public class mychanneldownstreamhandler implements channeldownstreamhandler {
public void handledownstream(
channelhandlercontext ctx, channelevent e) throws exception {
// handle current logic
ctx.senddownstream(e);
}
}
当channelhandler向channelpipelinecontext发送事件时,其内部从当前channelpipelinecontext节点出发找到下一个channelupstreamhandler或channeldownstreamhandler实例,并向其发送channelevent,对于downstream链,如果到达链尾,则将channelevent发送给channelsink:
public void senddownstream(channelevent e) {
defaultchannelhandlercontext prev = getactualdownstreamcontext(this.prev);
if (prev == null) {
try {
getsink().eventsunk(defaultchannelpipeline.this, e);
} catch (throwable t) {
notifyhandlerexception(e, t);
}
} else {
defaultchannelpipeline.this.senddownstream(prev, e);
}
}
public void sendupstream(channelevent e) {
defaultchannelhandlercontext next = getactualupstreamcontext(this.next);
if (next != null) {
defaultchannelpipeline.this.sendupstream(next, e);
}
}
正是因为这个实现,如果在一个末尾的channelupstreamhandler中先移除自己,在向末尾添加一个新的channelupstreamhandler,它是无效的,因为它的next已经在调用前就固定设置为null了。
channelpipeline作为channelhandler的容器,它还提供了各种增、删、改channelhandler链表中的方法,而且如果某个channelhandler还实现了lifecycleawarechannelhandler,则该channelhandler在被添加进channelpipeline或从中删除时都会得到同志:
public interface lifecycleawarechannelhandler extends channelhandler {
void beforeadd(channelhandlercontext ctx) throws exception;
void afteradd(channelhandlercontext ctx) throws exception;
void beforeremove(channelhandlercontext ctx) throws exception;
void afterremove(channelhandlercontext ctx) throws exception;
}
public interface channelpipeline {
void addfirst(string name, channelhandler handler);
void addlast(string name, channelhandler handler);
void addbefore(string basename, string name, channelhandler handler);
void addafter(string basename, string name, channelhandler handler);
void remove(channelhandler handler);
channelhandler remove(string name);
<t extends channelhandler> t remove(class<t> handlertype);
channelhandler removefirst();
channelhandler removelast();
void replace(channelhandler oldhandler, string newname, channelhandler newhandler);
channelhandler replace(string oldname, string newname, channelhandler newhandler);
<t extends channelhandler> t replace(class<t> oldhandlertype, string newname, channelhandler newhandler);
channelhandler getfirst();
channelhandler getlast();
channelhandler get(string name);
<t extends channelhandler> t get(class<t> handlertype);
channelhandlercontext getcontext(channelhandler handler);
channelhandlercontext getcontext(string name);
channelhandlercontext getcontext(class extends channelhandler> handlertype);
void sendupstream(channelevent e);
void senddownstream(channelevent e);
channelfuture execute(runnable task);
channel getchannel();
channelsink getsink();
void attach(channel channel, channelsink sink);
boolean isattached();
list<string> getnames();
map<string, channelhandler> tomap();
}
在defaultchannelpipeline的channelhandler链条的处理流程为:
参考:
posted on 2015-09-04 09:40
dlevin 阅读(7567)
评论(0) 编辑 收藏 所属分类:
architecture 、
netty