blogjava-凯发k8网页登录

blogjava-凯发k8网页登录http://www.blogjava.net/dlevin/in general the oo style is to use a lot of little objects with a lot of little methods that give us a lot of plug points for overriding and variation. to do is to be -nietzsche, to bei is to do -kant, do be do be do -sinatrazh-cnsat, 08 apr 2023 20:39:24 gmtsat, 08 apr 2023 20:39:24 gmt60使用namedparameterjdbctemplate遇到无法使用的坑http://www.blogjava.net/dlevin/archive/2015/11/11/428149.htmldlevindlevinwed, 11 nov 2015 10:46:00 gmthttp://www.blogjava.net/dlevin/archive/2015/11/11/428149.htmlhttp://www.blogjava.net/dlevin/comments/428149.htmlhttp://www.blogjava.net/dlevin/archive/2015/11/11/428149.html#feedback0http://www.blogjava.net/dlevin/comments/commentrss/428149.htmlhttp://www.blogjava.net/dlevin/services/trackbacks/428149.html最近一直在捣鼓hbase的项目,之前写了一些代码从数据库加载数据到hbase,所有的代码都跑得好好地,然而今天尝试着换了一个数据库,就跑不通了。通过数据工具,可以发现连接没有问题,而且有部分逻辑很顺利通过了,然而有一些就是卡主了,通过jstack打印出来的信息可以找到这样的堆栈:
"runner{object-loader#292}-objecthandler" #323 prio=5 os_prio=0 tid=0x00002aaadc5ec800 nid=0x7f62 in object.wait() [0x0000000056ce4000]
   java.lang.thread.state: waiting (on object monitor)
        at java.lang.object.wait(native method)
        at java.lang.object.wait(object.java:502)
        at org.apache.commons.pool.impl.genericobjectpool.borrowobject(genericobjectpool.java:1104)
        - locked <0x00000007736013e8> (a org.apache.commons.pool.impl.genericobjectpool$latch)
        at org.apache.commons.dbcp.poolingdatasource.getconnection(poolingdatasource.java:106)
        at org.apache.commons.dbcp.basicdatasource.getconnection(basicdatasource.java:1044)
        at org.springframework.jdbc.datasource.datasourceutils.dogetconnection(datasourceutils.java:111)
        at org.springframework.jdbc.datasource.datasourceutils.getconnection(datasourceutils.java:77)
所以开始我怀疑是连接的问题,从网上也找到了一个类型的现象,有人怀疑是dbpc的一个bug导致死锁:http://stackoverflow.com/questions/5714511/deadlock-issue-in-dbcp-deployed-on-tomcat,所以我升级了dbcp版本1.4,然而和这人一样的结果,升级dbcp版本并没有解决问题。简单的看dbcp的代码,都开始怀疑是不是因为没有spring jdbctemplate没有正确的把connection返回回去引起泄漏了,然而也有点感觉不太可能,因为这段代码在其他数据库都跑得好好地,但是我们的数据库版本都是一致的,然而其他配置上也被假设一致了(被忽略的一个重要的点)。

后来开始调配置,减少连接数,减少线程数,经过各种组合,发现当把db读的batch降到1的时候就可以work了,非常诡异的一个问题。从数据工具中查到,如果用batch,得到的sql是:
select from where iid in (@p0, @p1)如果是batch是1的话:
select  from 
 where iid in (@p0)这段sql语句是这么产生的:
datasource datasource = ....
this.jdbc = new namedparameterjdbctemplate(datasource);
...
mapsqlparametersource parameters = new mapsqlparametersource();
parameters.addvalue("params", paramsmap.keyset());
jdbc.query("select from
where in (:params)";, parameters, new resultsetextractor() {
....
})
如果是一个batch的话,在jstack堆栈中可以看到它一直在等数据库的返回结果:
"runner{object-loader#16}-objecthandler" #47 prio=5 os_prio=0 tid=0x0000000006ddd800 nid=0x2694 runnable [0x0000000045434000]
   java.lang.thread.state: runnable
        at java.net.socketinputstream.socketread0(native method)
        at java.net.socketinputstream.socketread(socketinputstream.java:116)
        at java.net.socketinputstream.read(socketinputstream.java:170)
        at java.net.socketinputstream.read(socketinputstream.java:141)
        at com.sybase.jdbc3.timedio.rawdbio.reallyread(unknown source)
        at com.sybase.jdbc3.timedio.dbio.doread(unknown source)
        at com.sybase.jdbc3.timedio.instreammgr.a(unknown source)
        at com.sybase.jdbc3.timedio.instreammgr.doread(unknown source)
        at com.sybase.jdbc3.tds.tdsprotocolcontext.getchunk(unknown source)
这也解释了第一个堆栈一直停在borrowobject(getconnection)的阶段,因为之前所有的connection都在数据库堵住没有返回,所以这个线程再拿connection的时候超过了我设置的最大connection数,所以就等着拿不到connection。

在后来查了一下不同数据库的jdbc driver信息(sp_version):
jconnect (tm) for jdbc(tm)/7.07 esd #4 (build 26793)/p/ebf20302/jdk 1.6.0/jdbcmain/opt/thu jul  5 22:08:44 pdt 2012
jconnect (tm) for jdbc(tm)/1000/wed mar 11 05:01:24 2015 pdt

也就是说这种用法是因为旧的jdbc driver对namedparameterjdbctemplate不完善引起的,这个坑花了我一整天的时间。。。。

dlevin 2015-11-11 18:46 发表评论
]]>sstable详解http://www.blogjava.net/dlevin/archive/2015/09/25/427481.htmldlevindlevinthu, 24 sep 2015 17:35:00 gmthttp://www.blogjava.net/dlevin/archive/2015/09/25/427481.htmlhttp://www.blogjava.net/dlevin/comments/427481.htmlhttp://www.blogjava.net/dlevin/archive/2015/09/25/427481.html#feedback0http://www.blogjava.net/dlevin/comments/commentrss/427481.htmlhttp://www.blogjava.net/dlevin/services/trackbacks/427481.html 前记几年前在读google的bigtable论文的时候,当时并没有理解论文里面表达的思想,因而囫囵吞枣,并没有注意到sstable的概念。再后来开始关注hbase的设计和源码后,开始对bigtable传递的思想慢慢的清晰起来,但是因为事情太多,没有安排出时间重读bigtable的论文。在项目里,我因为自己在学hbase,开始主推hbase,而另一个同事则因为对cassandra比较感冒,因而他主要关注cassandra的设计,不过我们两个人偶尔都会讨论一下技术、设计的各种观点和心得,然后他偶然的说了一句:cassandra和hbase都采用sstable格式存储,然后我本能的问了一句:什么是sstable?他并没有回答,可能也不是那么几句能说清楚的,或者他自己也没有尝试的去问过自己这个问题。然而这个问题本身却一直困扰着我,因而趁着现在有一些时间深入学习hbase和cassandra相关设计的时候先把这个问题弄清楚了。

sstable的定义

要解释这个术语的真正含义,最好的方法就是从它的出处找答案,所以重新翻开bigtable的论文。在这篇论文中,最初对sstable是这么描述的(第三页末和第四页初):
sstable

the google sstable file format is used internally to store bigtable data. an sstable provides a persistent, ordered immutable map from keys to values, where both keys and values are arbitrary byte strings. operations are provided to look up the value associated with a specified key, and to iterate over all key/value pairs in a specified key range. internally, each sstable contains a sequence of blocks (typically each block is 64kb in size, but this is configurable). a block index (stored at the end of the sstable) is used to locate blocks; the index is loaded into memory when the sstable is opened. a lookup can be performed with a single disk seek: we first find the appropriate block by performing a binary search in the in-memory index, and then reading the appropriate block from disk. optionally, an sstable can be completely mapped into memory, which allows us to perform lookups and scans without touching disk.

简单的非直译:
sstable是bigtable内部用于数据的文件格式,它的格式为文件本身就是一个排序的、不可变的、持久的key/value对map,其中key和value都可以是任意的byte字符串。使用key来查找value,或通过给定key范围遍历所有的key/value对。每个sstable包含一系列的block(一般block大小为64kb,但是它是可配置的),在sstable的末尾是block索引,用于定位block,这些索引在sstable打开时被加载到内存中,在查找时首先从内存中的索引二分查找找到block,然后一次磁盘寻道即可读取到相应的block。还有一种方案是将这个sstable加载到内存中,从而在查找和扫描中不需要读取磁盘。

这个貌似就是hfile第一个版本的格式么,贴张图感受一下:

在hbase使用过程中,对这个版本的hfile遇到以下一些问题(参考):
1. 解析时内存使用量比较高。
2. bloom filter和block索引会变的很大,而影响启动性能。具体的,bloom filter可以增长到100mb每个hfile,而block索引可以增长到300mb,如果一个hregionserver中有20个hregion,则他们分别能增长到2gb和6gb的大小。hregion需要在打开时,需要加载所有的block索引到内存中,因而影响启动性能;而在第一次request时,需要将整个bloom filter加载到内存中,再开始查找,因而bloom filter太大会影响第一次请求的延迟。
而hfile在版本2中对这些问题做了一些优化,具体会在hfile解析时详细说明。

sstable作为存储使用

继续bigtable的论文往下走,在5.3 tablet serving小节中这样写道(第6页):
tablet serving

updates are committed to a commit log that stores redo records. of these updates, the recently committed ones are stored in memory in a sorted buffer called a memtable; the older updates are stored in a sequence of sstables. to recover a tablet, a tablet server reads its metadata from the metadata table. this metadata contains the list of sstables that comprise a tablet and a set of a redo points, which are pointers into any commit logs that may contain data for the tablet. the server reads the indices of the sstables into memory and reconstructs the memtable by applying all of the updates that have committed since the redo points.

when a write operation arrives at a tablet server, the server checks that it is well-formed, and that the sender is authorized to perform the mutation. authorization is performed by reading the list of permitted writers from a chubby file (which is almost always a hit in the chubby client cache). a valid mutation is written to the commit log. group commit is used to improve the throughput of lots of small mutations [13, 16]. after the write has been committed, its contents are inserted into the memtable.

when a read operation arrives at a tablet server, it is similarly checked for well-formedness and proper authorization. a valid read operation is executed on a merged view of the sequence of sstables and the memtable. since the sstables and the memtable are lexicographically sorted data structures, the merged view can be formed efficiently.

incoming read and write operations can continue while tablets are split and merged.

第一段和第三段简单描述,非翻译:
在新数据写入时,这个操作首先提交到日志中作为redo纪录,最近的数据存储在内存的排序缓存memtable中;旧的数据存储在一系列的sstable 中。在recover中,tablet server从metadata表中读取metadata,metadata包含了组成tablet的所有sstable(纪录了这些sstable的元 数据信息,如sstable的位置、startkey、endkey等)以及一系列日志中的redo点。tablet server读取sstable的索引到内存,并replay这些redo点之后的更新来重构memtable。
在读时,完成格式、授权等检查后,读会同时读取sstable、memtable(hbase中还包含了blockcache中的数据)并合并他们的结果,由于sstable和memtable都是字典序排列,因而合并操作可以很高效完成。

sstable在compaction过程中的使用

在bigtable论文5.4 compaction小节中是这样说的:
compaction

as write operations execute, the size of the memtable increases. when the memtable size reaches a threshold, the memtable is frozen, a new memtable is created, and the frozen memtable is converted to an sstable and written to gfs. this minor compaction process has two goals: it shrinks the memory usage of the tablet server, and it reduces the amount of data that has to be read from the commit log during recovery if this server dies. incoming read and write operations can continue while compactions occur.

every minor compaction creates a new sstable. if this behavior continued unchecked, read operations might need to merge updates from an arbitrary number of sstables. instead, we bound the number of such files by periodically executing a merging compaction in the background. a merging compaction reads the contents of a few sstables and the memtable, and writes out a new sstable. the input sstables and memtable can be discarded as soon as the compaction has finished.

a merging compaction that rewrites all sstables into exactly one sstable is called a major compaction. sstables produced by non-major compactions can contain special deletion entries that suppress deleted data in older sstables that are still live. a major compaction, on the other hand, produces an sstable that contains no deletion information or deleted data. bigtable cycles through all of its tablets and regularly applies major compactions to them. these major compactions allow bigtable to reclaim resources used by deleted data, and also allow it to ensure that deleted data disappears from the system in a timely fashion, which is important for services that store sensitive data.

随着memtable大小增加到一个阀值,这个memtable会被冻住而创建一个新的memtable以供使用,而旧的memtable会转换成一个sstable而写道gfs中,这个过程叫做minor compaction。这个minor compaction可以减少内存使用量,并可以减少日志大小,因为持久化后的数据可以从日志中删除。在minor compaction过程中,可以继续处理读写请求。
每次minor compaction会生成新的sstable文件,如果sstable文件数量增加,则会影响读的性能,因而每次读都需要读取所有sstable文件,然后合并结果,因而对sstable文件个数需要有上限,并且时不时的需要在后台做merging compaction,这个merging compaction读取一些sstable文件和memtable的内容,并将他们合并写入一个新的sstable中。当这个过程完成后,这些源sstable和memtable就可以被删除了。
如果一个merging compaction是合并所有sstable到一个sstable,则这个过程称做major compaction。一次major compaction会将mark成删除的信息、数据删除,而其他两次compaction则会保留这些信息、数据(mark的形式)。bigtable会时不时的扫描所有的tablet,并对它们做major compaction。这个major compaction可以将需要删除的数据真正的删除从而节省空间,并保持系统一致性。

sstable的locality和in memory

在bigtable中,它的本地性是由locality group来定义的,即多个column family可以组合到一个locality group中,在同一个tablet中,使用单独的sstable存储这些在同一个locality group的column family。hbase把这个模型简化了,即每个column family在每个hregion都使用单独的hfile存储,hfile没有locality group的概念,或者一个column family就是一个locality group。

在bigtable中,还可以支持在locality group级别设置是否将所有这个locality group的数据加载到内存中,在hbase中通过column family定义时设置。这个内存加载采用延时加载,主要应用于一些小的column family,并且经常被用到的,从而提升读的性能,因而这样就不需要再从磁盘中读取了。

sstable压缩

bigtable的压缩是基于locality group级别:
compression

clients can control whether or not the sstables for a locality group are compressed, and if so, which compression format is used. the user-specified compression format is applied to each sstable block (whose size is controllable via a locality group specific tuning parameter). although we lose some space by compressing each block separately, we benefit in that small portions of an sstable can be read without decompressing the entire file. many clients use a two-pass custom compression scheme. the first pass uses bentley and mcilroy’s scheme [6], which compresses long common strings across a large window. the second pass uses a fast compression algorithm that looks for repetitions in a small 16 kb window of the data. both compression passes are very fast—they encode at 100–200 mb/s, and decode at 400–1000 mb/s on modern machines.

bigtable的压缩以sstable中的一个block为单位,虽然每个block为压缩单位损失一些空间,但是采用这种方式,我们可以以block为单位读取、解压、分析,而不是每次以一个“大”的sstable为单位读取、解压、分析。

sstable的读缓存

为了提升读的性能,bigtable采用两层缓存机制:
caching for read performance

to improve read performance, tablet servers use two levels of caching. the scan cache is a higher-level cache that caches the key-value pairs returned by the sstable interface to the tablet server code. the block cache is a lower-level cache that caches sstables blocks that were read from gfs. the scan cache is most useful for applications that tend to read the same data repeatedly. the block cache is useful for applications that tend to read data that is close to the data they recently read (e.g., sequential reads, or random reads of different columns in the same locality group within a hot row).

两层缓存分别是:
1. high level,缓存从sstable读取的key/value对。提升那些倾向重复的读取相同的数据的操作(引用局部性原理)。
2. low level,blockcache,缓存sstable中的block。提升那些倾向于读取相近数据的操作。

bloom filter

前文有提到bigtable采用合并读,即需要读取每个sstable中的相关数据,并合并成一个结果返回,然而每次读都需要读取所有sstable,自然会耗费性能,因而引入了bloom filter,它可以很快速的找到一个rowkey不在某个sstable中的事实(注:反过来则不成立)。
bloom filter

as described in section 5.3, a read operation has to read from all sstables that make up the state of a tablet. if these sstables are not in memory, we may end up doing many disk accesses. we reduce the number of accesses by allowing clients to specify that bloom fil- ters [7] should be created for sstables in a particu- lar locality group. a bloom filter allows us to ask whether an sstable might contain any data for a spec- ified row/column pair. for certain applications, a small amount of tablet server memory used for storing bloom filters drastically reduces the number of disk seeks re- quired for read operations. our use of bloom filters also implies that most lookups for non-existent rows or columns do not need to touch disk.

sstable设计成immutable的好处

在sstable定义中就有提到sstable是一个immutable的order map,这个immutable的设计可以让系统简单很多:
exploiting immutability

besides the sstable caches, various other parts of the bigtable system have been simplified by the fact that all of the sstables that we generate are immutable. for example, we do not need any synchronization of accesses to the file system when reading from sstables. as a result, concurrency control over rows can be implemented very efficiently. the only mutable data structure that is accessed by both reads and writes is the memtable. to reduce contention during reads of the memtable, we make each memtable row copy-on-write and allow reads and writes to proceed in parallel.

since sstables are immutable, the problem of permanently removing deleted data is transformed to garbage collecting obsolete sstables. each tablet’s sstables are registered in the metadata table. the master removes obsolete sstables as a mark-and-sweep garbage collection [25] over the set of sstables, where the metadata table contains the set of roots.

finally, the immutability of sstables enables us to split tablets quickly. instead of generating a new set of sstables for each child tablet, we let the child tablets share the sstables of the parent tablet.

关于immutable的优点有以下几点:
1. 在读sstable是不需要同步。读写同步只需要在memtable中处理,为了减少memtable的读写竞争,bigtable将memtable的row设计成copy-on-write,从而读写可以同时进行。
2. 永久的移除数据转变为sstable的garbage collect。每个tablet中的sstable在metadata表中有注册,master使用mark-and-sweep算法将sstable在gc过程中移除。
3. 可以让tablet split过程变的高效,我们不需要为每个子tablet创建新的sstable,而是可以共享父tablet的sstable。

dlevin 2015-09-25 01:35 发表评论
]]>
[转]高性能io模型浅析http://www.blogjava.net/dlevin/archive/2015/09/04/427118.htmldlevindlevinfri, 04 sep 2015 07:16:00 gmthttp://www.blogjava.net/dlevin/archive/2015/09/04/427118.htmlhttp://www.blogjava.net/dlevin/comments/427118.htmlhttp://www.blogjava.net/dlevin/archive/2015/09/04/427118.html#feedback0http://www.blogjava.net/dlevin/comments/commentrss/427118.htmlhttp://www.blogjava.net/dlevin/services/trackbacks/427118.html高性能io模型浅析

转自:http://www.cnblogs.com/fanzhidongyzby/p/4098546.html

服务器端编程经常需要构造高性能的io模型,常见的io模型有四种:

(1)同步阻塞io(blocking io):即传统的io模型。

(2)同步非阻塞io(non-blocking io):默认创建的socket都是阻塞的,非阻塞io要求socket被设置为nonblock。注意这里所说的nio并非java的nio(new io)库。

(3)io多路复用(io multiplexing):即经典的reactor设计模式,有时也称为异步阻塞io,java中的selector和linux中的epoll都是这种模型。

(4)异步io(asynchronous io):即经典的proactor设计模式,也称为异步非阻塞io。

同步和异步的概念描述的是用户线程与内核的交互方式:同步是指用户线程发起io请求后需要等待或者轮询内核io操作完成后才能继续执行;而异步是指用户线程发起io请求后仍继续执行,当内核io操作完成后会通知用户线程,或者调用用户线程注册的回调函数。

阻塞和非阻塞的概念描述的是用户线程调用内核io操作的方式:阻塞是指io操作需要彻底完成后才返回到用户空间;而非阻塞是指io操作被调用后立即返回给用户一个状态值,无需等到io操作彻底完成。

另外,richard stevens 在《unix 网络编程》卷1中提到的基于信号驱动的io(signal driven io)模型,由于该模型并不常用,本文不作涉及。接下来,我们详细分析四种常见的io模型的实现原理。为了方便描述,我们统一使用io的读操作作为示例。

一、同步阻塞io

同步阻塞io模型是最简单的io模型,用户线程在内核进行io操作时被阻塞。

1 同步阻塞io

如图1所示,用户线程通过系统调用read发起io读操作,由用户空间转到内核空间。内核等到数据包到达后,然后将接收的数据拷贝到用户空间,完成read操作。

用户线程使用同步阻塞io模型的伪代码描述为:

{
    read(socket, buffer);
    process(buffer);
}

即用户需要等待read将socket中的数据读取到buffer后,才继续处理接收的数据。整个io请求的过程中,用户线程是被阻塞的,这导致用户在发起io请求时,不能做任何事情,对cpu的资源利用率不够。

二、同步非阻塞io

同步非阻塞io是在同步阻塞io的基础上,将socket设置为nonblock。这样做用户线程可以在发起io请求后可以立即返回。

 

图2 同步非阻塞io

如图2所示,由于socket是非阻塞的方式,因此用户线程发起io请求时立即返回。但并未读取到任何数据,用户线程需要不断地发起io请求,直到数据到达后,才真正读取到数据,继续执行。

用户线程使用同步非阻塞io模型的伪代码描述为:

{
    
while(read(socket, buffer) != success) { }
    process(buffer);
}

即 用户需要不断地调用read,尝试读取socket中的数据,直到读取成功后,才继续处理接收的数据。整个io请求的过程中,虽然用户线程每次发起io请 求后可以立即返回,但是为了等到数据,仍需要不断地轮询、重复请求,消耗了大量的cpu的资源。一般很少直接使用这种模型,而是在其他io模型中使用非阻 塞io这一特性。

三、io多路复用

io多路复用模型是建立在内核提供的多路分离函数select基础之上的,使用select函数可以避免同步非阻塞io模型中轮询等待的问题。

图3 多路分离函数select

如图3所示,用户首先将需要进行io操作的socket添加到select中,然后阻塞等待select系统调用返回。当数据到达时,socket被激活,select函数返回。用户线程正式发起read请求,读取数据并继续执行。

从 流程上来看,使用select函数进行io请求和同步阻塞模型没有太大的区别,甚至还多了添加监视socket,以及调用select函数的额外操作,效 率更差。但是,使用select以后最大的优势是用户可以在一个线程内同时处理多个socket的io请求。用户可以注册多个socket,然后不断地调 用select读取被激活的socket,即可达到在同一个线程内同时处理多个io请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。

用户线程使用select函数的伪代码描述为:

{
    select(socket);
    
while(1) {
        sockets 
= select();
        
for(socket in sockets) {
            
if(can_read(socket)) {
                read(socket, buffer);
                process(buffer);
            }
        }
    }
}

其中while循环前将socket添加到select监视中,然后在while内一直调用select获取被激活的socket,一旦socket可读,便调用read函数将socket中的数据读取出来。

然 而,使用select函数的优点并不仅限于此。虽然上述方式允许单线程内处理多个io请求,但是每个io请求的过程还是阻塞的(在select函数上阻 塞),平均时间甚至比同步阻塞io模型还要长。如果用户线程只注册自己感兴趣的socket或者io请求,然后去做自己的事情,等到数据到来时再进行处 理,则可以提高cpu的利用率。

io多路复用模型使用了reactor设计模式实现了这一机制。

图4 reactor设计模式

如 图4所示,eventhandler抽象类表示io事件处理器,它拥有io文件句柄handle(通过get_handle获取),以及对handle的 操作handle_event(读/写等)。继承于eventhandler的子类可以对事件处理器的行为进行定制。reactor类用于管理 eventhandler(注册、删除等),并使用handle_events实现事件循环,不断调用同步事件多路分离器(一般是内核)的多路分离函数 select,只要某个文件句柄被激活(可读/写等),select就返回(阻塞),handle_events就会调用与文件句柄关联的事件处理器的 handle_event进行相关操作。

5 io多路复用

如 图5所示,通过reactor的方式,可以将用户线程轮询io操作状态的工作统一交给handle_events事件循环进行处理。用户线程注册事件处理 器之后可以继续执行做其他的工作(异步),而reactor线程负责调用内核的select函数检查socket状态。当有socket被激活时,则通知 相应的用户线程(或执行用户线程的回调函数),执行handle_event进行数据读取、处理的工作。由于select函数是阻塞的,因此多路io复用 模型也被称为异步阻塞io模型。注意,这里的所说的阻塞是指select函数执行时线程被阻塞,而不是指socket。一般在使用io多路复用模型 时,socket都是设置为nonblock的,不过这并不会产生影响,因为用户发起io请求时,数据已经到达了,用户线程一定不会被阻塞。

用户线程使用io多路复用模型的伪代码描述为:

void usereventhandler::handle_event() {
    
if(can_read(socket)) {
        read(socket, buffer);
        process(buffer);
    }
}

{
    reactor.register(
new usereventhandler(socket));
}

用户需要重写eventhandler的handle_event函数进行读取数据、处理数据的工作,用户线程只需要将自己的eventhandler注册到reactor即可。reactor中handle_events事件循环的伪代码大致如下。

reactor::handle_events() {
    
while(1) {
       sockets 
= select();
       
for(socket in sockets) {
            get_event_handler(socket).handle_event();
       }
    }
}

事件循环不断地调用select获取被激活的socket,然后根据获取socket对应的eventhandler,执行器handle_event函数即可。

io多路复用是最常使用的io模型,但是其异步程度还不够“彻底”,因为它使用了会阻塞线程的select系统调用。因此io多路复用只能称为异步阻塞io,而非真正的异步io。

四、异步io

“真 正”的异步io需要操作系统更强的支持。在io多路复用模型中,事件循环将文件句柄的状态事件通知给用户线程,由用户线程自行读取数据、处理数据。而在异 步io模型中,当用户线程收到通知时,数据已经被内核读取完毕,并放在了用户线程指定的缓冲区内,内核在io完成后通知用户线程直接使用即可。

异步io模型使用了proactor设计模式实现了这一机制。

图6 proactor设计模式

如 图6,proactor模式和reactor模式在结构上比较相似,不过在用户(client)使用方式上差别较大。reactor模式中,用户线程通过 向reactor对象注册感兴趣的事件监听,然后事件触发时调用事件处理函数。而proactor模式中,用户线程将 asynchronousoperation(读/写等)、proactor以及操作完成时的completionhandler注册到 asynchronousoperationprocessor。asynchronousoperationprocessor使用facade模式提 供了一组异步操作api(读/写等)供用户使用,当用户线程调用异步api后,便继续执行自己的任务。 asynchronousoperationprocessor 会开启独立的内核线程执行异步操作,实现真正的异步。当异步io操作完成 时,asynchronousoperationprocessor将用户线程与asynchronousoperation一起注册的proactor 和completionhandler取出,然后将completionhandler与io操作的结果数据一起转发给 proactor,proactor负责回调每一个异步操作的事件完成处理函数handle_event。虽然proactor模式中每个异步操作都可以 绑定一个proactor对象,但是一般在操作系统中,proactor被实现为singleton模式,以便于集中化分发操作完成事件。

7 异步io

如 图7所示,异步io模型中,用户线程直接使用内核提供的异步io api发起read请求,且发起后立即返回,继续执行用户线程代码。不过此时用户线程已 经将调用的asynchronousoperation和completionhandler注册到内核,然后操作系统开启独立的内核线程去处理io操 作。当read请求的数据到达时,由内核负责读取socket中的数据,并写入用户指定的缓冲区中。最后内核将read的数据和用户线程注册的 completionhandler分发给内部proactor,proactor将io完成的信息通知给用户线程(一般通过调用用户线程注册的完成事件 处理函数),完成异步io。

用户线程使用异步io模型的伪代码描述为:


void usercompletionhandler::handle_event(buffer) {
    process(buffer);
}

{
    aio_read(socket, 
new usercompletionhandler);
}

用户需要重写completionhandler的handle_event函数进行处理数据的工作,参数buffer表示proactor已经准备好的数据,用户线程直接调用内核提供的异步io api,并将重写的completionhandler注册即可。

相 比于io多路复用模型,异步io并不十分常用,不少高性能并发服务程序使用io多路复用模型 多线程任务处理的架构基本可以满足需求。况且目前操作系统对 异步io的支持并非特别完善,更多的是采用io多路复用模型模拟异步io的方式(io事件触发时不直接通知用户线程,而是将数据读写完毕后放到用户指定的 缓冲区中)。java7之后已经支持了异步io,感兴趣的读者可以尝试使用。

本文从基本概念、工作流程和代码示 例三个层次简要描述了常见的四种高性能io模型的结构和原理,理清了同步、异步、阻塞、非阻塞这些容易混淆的概念。通过对高性能io模型的理解,可以在服 务端程序的开发中选择更符合实际业务特点的io模型,提高服务质量。希望本文对你有所帮助。


相似的:
http://www.cnblogs.com/nufangrensheng/p/3588690.html
http://www.ibm.com/developerworks/cn/linux/l-async/



dlevin 2015-09-04 15:16 发表评论
]]>
netty3架构解析http://www.blogjava.net/dlevin/archive/2015/09/04/427031.htmldlevindlevinfri, 04 sep 2015 01:40:00 gmthttp://www.blogjava.net/dlevin/archive/2015/09/04/427031.htmlhttp://www.blogjava.net/dlevin/comments/427031.htmlhttp://www.blogjava.net/dlevin/archive/2015/09/04/427031.html#feedback0http://www.blogjava.net/dlevin/comments/commentrss/427031.htmlhttp://www.blogjava.net/dlevin/services/trackbacks/427031.html前记很早以前就有读netty源码的打算了,然而第一次尝试的时候从netty4开始,一直抓不到核心的框架流程,后来因为其他事情忙着就放下了。这次趁着休假重新捡起这个硬骨头,因为netty3现在还在被很多项目使用,因而这次决定先从netty3入手,瞬间发现netty3的代码比netty4中规中矩的多,很多概念在代码本身中都有清晰的表达,所以半天就把整个框架的骨架搞清楚了。再读,回去读netty4的源码,反而觉得轻松了,一种豁然开朗的感觉。

记得去年读jetty源码的时候,因为代码太庞大,并且自己的http server的了解太少,因而只能自底向上的一个一个模块的叠加,直到最后把所以的模块连接在一起而看清它的真正核心骨架。现在读源码,开始习惯先把骨架理清,然后延伸到不同的器官、血肉而看清整个人体。

本文从reactor模式在netty3中的应用,引出netty3的整体架构以及控制流程;然而除了reactor模式,netty3还在channelpipeline中使用了intercepting filter模式,这个模式也在servlet的filter中成功使用,因而本文还会从intercepting filter模式出发详细介绍channelpipeline的设计理念。本文假设读者已经对netty有一定的了解,因而不会包含过多入门介绍,以及帮netty做宣传的文字。

netty3中的reactor模式

reactor模式在netty中应用非常成功,因而它也是在netty中受大肆宣传的模式,关于reactor模式可以详细参考本人的另一篇文章《reactor模式详解》,对reactor模式的实现是netty3的基本骨架,因而本小节会详细介绍reactor模式如何应用netty3中。

如果读《reactor模式详解》,我们知道reactor模式由handle、synchronous event demultiplexer、initiation dispatcher、event handler、concrete event handler构成,在java的实现版本中,channel对应handle,selector对应synchronous event demultiplexer,并且netty3还使用了两层reactor:main reactor用于处理client的连接请求,sub reactor用于处理和client连接后的读写请求(关于这个概念还可以参考doug lea的这篇ppt:)。所以我们先要解决netty3中使用什么类实现所有的上述模块并把他们联系在一起的,以nio实现方式为例:

模式是一种抽象,但是在实现中,经常会因为语言特性、框架和性能需要而做一些改变,因而netty3对reactor模式的实现有一套自己的设计:
1. channelevent:reactor是基于事件编程的,因而在netty3中使用channelevent抽象的表达netty3内部可以产生的各种事件,所有这些事件对象在channels帮助类中产生,并且由它将事件推入到channelpipeline中,channelpipeline构建channelhandler管道,channelevent流经这个管道实现所有的业务逻辑处理。channelevent对应的事件有:channelstateevent表示channel状态的变化事件,而如果当前channel存在parent channel,则该事件还会传递到parent channel的channelpipeline中,如open、bound、connected、interest_ops等,该事件可以在各种不同实现的channel、channelsink中产生;messageevent表示从socket中读取数据完成、需要向socket写数据或channelhandler对当前message解析(如decoder、encoder)后触发的事件,它由nioworker、需要对message做进一步处理的channelhandler产生;writecompletionevent表示写完成而触发的事件,它由nioworker产生;exceptionevent表示在处理过程中出现的exception,它可以发生在各个构件中,如channel、channelsink、nioworker、channelhandler中;idlestateevent由idlestatehandler触发,这也是一个channelevent可以无缝扩展的例子。注:在netty4后,已经没有channelevent类,所有不同事件都用对应方法表达,这也意味这channelevent不可扩展,netty4采用在channelinboundhandler中加入usereventtriggered()方法来实现这种扩展,具体可以参考。
2. channelhandler:在netty3中,channelhandler用于表示reactor模式中的eventhandler。channelhandler只是一个标记接口,它有两个子接口:channeldownstreamhandler和channelupstreamhandler,其中channeldownstreamhandler表示从用户应用程序流向netty3内部直到向socket写数据的管道,在netty4中改名为channeloutboundhandler;channelupstreamhandler表示数据从socket进入netty3内部向用户应用程序做数据处理的管道,在netty4中改名为channelinboundhandler。
3. channelpipeline:用于管理channelhandler的管道,每个channel一个channelpipeline实例,可以运行过程中动态的向这个管道中添加、删除channelhandler(由于实现的限制,在最末端的channelhandler向后添加或删除channelhandler不一定在当前执行流程中起效,参考)。channelpipeline内部维护一个channelhandler的双向链表,它以upstream(inbound)方向为正向,downstream(outbound)方向为方向。channelpipeline采用intercepting filter模式实现,具体可以参考这里,这个模式的实现在后一节中还是详细介绍。
4. nioselector:netty3使用nioselector来存放selector(synchronous event demultiplexer),每个新产生的nio channel都向这个selector注册自己以让这个selector监听这个nio channel中发生的事件,当事件发生时,调用帮助类channels中的方法生成channelevent实例,将该事件发送到这个netty channel对应的channelpipeline中,而交给各级channelhandler处理。其中在向selector注册nio channel时,netty channel实例以attachment的形式传入,该netty channel在其内部的nio channel事件发生时,会以attachment的形式存在于selectionkey中,因而每个事件可以直接从这个attachment中获取相关链的netty channel,并从netty channel中获取与之相关联的channelpipeline,这个实现和doug lea的一模一样。另外netty3还采用了中相同的main reactor和sub reactor设计,其中nioselector的两个实现:boss即为main reactor,nioworker为sub reactor。boss用来处理新连接加入的事件,nioworker用来处理各个连接对socket的读写事件,其中boss通过nioworkerpool获取nioworker实例,netty3模式使用roundrobin方式放回nioworker实例。更形象一点的,可以通过的这张图表达:

若与ractor模式对应,nioselector中包含了synchronous event demultiplexer,而channelpipeline中管理着所有eventhandler,因而nioselector和channelpipeline共同构成了initiation dispatcher。
5. channelsink:在channelhandler处理完成所有逻辑需要向客户端写响应数据时,一般会调用netty channel中的write方法,然而在这个write方法实现中,它不是直接向其内部的socket写数据,而是交给channels帮助类,内部创建downstreammessageevent,反向从channelpipeline的管道中流过去,直到第一个channelhandler处理完毕,最后交给channelsink处理,以避免阻塞写而影响程序的吞吐量。channelsink将这个messageevent提交给netty channel中的writebufferqueue,最后nioworker会等到这个nio channel已经可以处理写事件时无阻塞的向这个nio channel写数据。这就是上图的send是从subreactor直接出发的原因。
6. channel:netty有自己的channel抽象,它是一个资源的容器,包含了所有一个连接涉及到的所有资源的饮用,如封装nio channel、channelpipeline、boss、nioworkerpool等。另外它还提供了向内部nio channel写响应数据的接口write、连接/绑定到某个地址的connect/bind接口等,个人感觉虽然对channel本身来说,因为它封装了nio channel,因而这些接口定义在这里是合理的,但是如果考虑到netty的架构,它的channel只是一个资源容器,有这个channel实例就可以得到和它相关的基本所有资源,因而这种write、connect、bind动作不应该再由它负责,而是应该由其他类来负责,比如在netty4中就在channelhandlercontext添加了write方法,虽然netty4并没有删除channel中的write接口。

netty3中的intercepting filter模式

如果说reactor模式是netty3的骨架,那么intercepting filter模式则是netty的中枢。reactor模式主要应用在netty3的内部实现,它是netty3具有良好性能的基础,而intercepting filter模式则是channelhandler组合实现一个应用程序逻辑的基础,只有很好的理解了这个模式才能使用好netty,甚至能得心应手。

关于intercepting filter模式的详细介绍可以参考这里,本节主要介绍netty3中对intercepting filter模式的实现,其实就是defaultchannelpipeline对intercepting filter模式的实现。在上文有提到netty3的channelpipeline是channelhandler的容器,用于存储与管理channelhandler,同时它在netty3中也起到桥梁的作用,即它是连接netty3内部到所有channelhandler的桥梁。作为channelpipeline的实现者defaultchannelpipeline,它使用一个channelhandler的双向链表来存储,以defaultchannelpipelinecontext作为节点:
public interface channelhandlercontext {
    channel getchannel();

    channelpipeline getpipeline();

    string getname();

    channelhandler gethandler();

    
boolean canhandleupstream();
    
boolean canhandledownstream();
    
void sendupstream(channelevent e);
    
void senddownstream(channelevent e);
    object getattachment();

    
void setattachment(object attachment);
}

private final class defaultchannelhandlercontext implements channelhandlercontext {
   
volatile defaultchannelhandlercontext next;
   
volatile defaultchannelhandlercontext prev;
   
private final string name;
   
private final channelhandler handler;
   
private final boolean canhandleupstream;
   
private final boolean canhandledownstream;
   
private volatile object attachment;
.....
}
在defaultchannelpipeline中,它存储了和当前channelpipeline相关联的channel、channelsink以及channelhandler链表的head、tail,所有channelevent通过sendupstream、senddownstream为入口流经整个链表:
public class defaultchannelpipeline implements channelpipeline {
    
private volatile channel channel;
    
private volatile channelsink sink;
    
private volatile defaultchannelhandlercontext head;
    
private volatile defaultchannelhandlercontext tail;
......
    
public void sendupstream(channelevent e) {
        defaultchannelhandlercontext head 
= getactualupstreamcontext(this.head);
        
if (head == null) {
            
return;
        }
        sendupstream(head, e);
    }

    
void sendupstream(defaultchannelhandlercontext ctx, channelevent e) {
        
try {
            ((channelupstreamhandler) ctx.gethandler()).handleupstream(ctx, e);
        } 
catch (throwable t) {
            notifyhandlerexception(e, t);
        }
    }

    
public void senddownstream(channelevent e) {
        defaultchannelhandlercontext tail 
= getactualdownstreamcontext(this.tail);
        
if (tail == null) {
            
try {
                getsink().eventsunk(
this, e);
                
return;
            } 
catch (throwable t) {
                notifyhandlerexception(e, t);
                
return;
            }
        }
        senddownstream(tail, e);
    }

    
void senddownstream(defaultchannelhandlercontext ctx, channelevent e) {
        
if (e instanceof upstreammessageevent) {
            
throw new illegalargumentexception("cannot send an upstream event to downstream");
        }
        
try {
            ((channeldownstreamhandler) ctx.gethandler()).handledownstream(ctx, e);
        } 
catch (throwable t) {
            e.getfuture().setfailure(t);
            notifyhandlerexception(e, t);
        }
    }
对upstream事件,向后找到所有实现了channelupstreamhandler接口的channelhandler组成链(getactualupstreamcontext()),而对downstream事件,向前找到所有实现了channeldownstreamhandler接口的channelhandler组成链(getactualdownstreamcontext()):
    private defaultchannelhandlercontext getactualupstreamcontext(defaultchannelhandlercontext ctx) {
        
if (ctx == null) {
            
return null;
        }
        defaultchannelhandlercontext realctx 
= ctx;
        
while (!realctx.canhandleupstream()) {
            realctx 
= realctx.next;
            
if (realctx == null) {
                
return null;
            }
        }
        
return realctx;
    }
    
private defaultchannelhandlercontext getactualdownstreamcontext(defaultchannelhandlercontext ctx) {
        
if (ctx == null) {
            
return null;
        }
        defaultchannelhandlercontext realctx 
= ctx;
        
while (!realctx.canhandledownstream()) {
            realctx 
= realctx.prev;
            
if (realctx == null) {
                
return null;
            }
        }
        
return realctx;
    }
在实际实现channelupstreamhandler或channeldownstreamhandler时,调用 channelhandlercontext中的sendupstream或senddownstream方法将控制流程交给下一个 channelupstreamhandler或下一个channeldownstreamhandler,或调用channel中的write方法发送 响应消息。
public class mychannelupstreamhandler implements channelupstreamhandler {
    
public void handleupstream(channelhandlercontext ctx, channelevent e) throws exception {
        
// handle current logic, use channel to write response if needed.
        
// ctx.getchannel().write(message);
        ctx.sendupstream(e);
    }
}

public class mychanneldownstreamhandler implements channeldownstreamhandler {
    
public void handledownstream(
            channelhandlercontext ctx, channelevent e) 
throws exception {
        
// handle current logic
        ctx.senddownstream(e);
    }
}
当channelhandler向channelpipelinecontext发送事件时,其内部从当前channelpipelinecontext节点出发找到下一个channelupstreamhandler或channeldownstreamhandler实例,并向其发送channelevent,对于downstream链,如果到达链尾,则将channelevent发送给channelsink:
public void senddownstream(channelevent e) {
    defaultchannelhandlercontext prev 
= getactualdownstreamcontext(this.prev);
   
if (prev == null) {
       
try {
            getsink().eventsunk(defaultchannelpipeline.
this, e);
        } 
catch (throwable t) {
            notifyhandlerexception(e, t);
        }
    } 
else {
        defaultchannelpipeline.
this.senddownstream(prev, e);
    }
}

public void sendupstream(channelevent e) {
    defaultchannelhandlercontext next 
= getactualupstreamcontext(this.next);
   
if (next != null) {
        defaultchannelpipeline.
this.sendupstream(next, e);
    }
}
正是因为这个实现,如果在一个末尾的channelupstreamhandler中先移除自己,在向末尾添加一个新的channelupstreamhandler,它是无效的,因为它的next已经在调用前就固定设置为null了。

channelpipeline作为channelhandler的容器,它还提供了各种增、删、改channelhandler链表中的方法,而且如果某个channelhandler还实现了lifecycleawarechannelhandler,则该channelhandler在被添加进channelpipeline或从中删除时都会得到同志:
public interface lifecycleawarechannelhandler extends channelhandler {
    
void beforeadd(channelhandlercontext ctx) throws exception;
    
void afteradd(channelhandlercontext ctx) throws exception;
    
void beforeremove(channelhandlercontext ctx) throws exception;
    
void afterremove(channelhandlercontext ctx) throws exception;
}

public interface channelpipeline {
    
void addfirst(string name, channelhandler handler);
    
void addlast(string name, channelhandler handler);
    
void addbefore(string basename, string name, channelhandler handler);
    
void addafter(string basename, string name, channelhandler handler);
    
void remove(channelhandler handler);
    channelhandler remove(string name);

    
<extends channelhandler> t remove(class<t> handlertype);
    channelhandler removefirst();

    channelhandler removelast();

    
void replace(channelhandler oldhandler, string newname, channelhandler newhandler);
    channelhandler replace(string oldname, string newname, channelhandler newhandler);

    
<extends channelhandler> t replace(class<t> oldhandlertype, string newname, channelhandler newhandler);
    channelhandler getfirst();

    channelhandler getlast();

    channelhandler get(string name);

    
<extends channelhandler> t get(class<t> handlertype);
    channelhandlercontext getcontext(channelhandler handler);

    channelhandlercontext getcontext(string name);

    channelhandlercontext getcontext(class
 extends channelhandler> handlertype);
    
void sendupstream(channelevent e);
    
void senddownstream(channelevent e);
    channelfuture execute(runnable task);

    channel getchannel();

    channelsink getsink();

    
void attach(channel channel, channelsink sink);
    
boolean isattached();
    list
<string> getnames();
    map
<string, channelhandler> tomap();
}

在defaultchannelpipeline的channelhandler链条的处理流程为:

参考:








dlevin 2015-09-04 09:40 发表评论
]]>
intercepting filter模式详解http://www.blogjava.net/dlevin/archive/2015/09/03/427086.htmldlevindlevinthu, 03 sep 2015 14:14:00 gmthttp://www.blogjava.net/dlevin/archive/2015/09/03/427086.htmlhttp://www.blogjava.net/dlevin/comments/427086.htmlhttp://www.blogjava.net/dlevin/archive/2015/09/03/427086.html#feedback0http://www.blogjava.net/dlevin/comments/commentrss/427086.htmlhttp://www.blogjava.net/dlevin/services/trackbacks/427086.html问题描述在服务器编程中,通常需要处理多种不同的请求,在正式处理请求之前,需要对请求做一些预处理,如:
  1. 纪录每个client的每次访问信息。
  2. 对client进行认证和授权检查(authentication and authorization)。
  3. 检查当前session是否合法。
  4. 检查client的ip地址是否可信赖或不可信赖(ip地址白名单、黑名单)。
  5. 请求数据是否先要解压或解码。
  6. 是否支持client请求的类型、browser版本等。
  7. 添加性能监控信息。
  8. 添加调试信息。
  9. 保证所有异常都被正确捕获到,对未预料到的异常做通用处理,防止给client看到内部堆栈信息。

在响应返回给客户端之前,有时候也需要做一些预处理再返回:

  1. 对响应消息编码或压缩。
  2. 为所有响应添加公共头、尾等消息。
  3. 进一步enrich响应消息,如添加公共字段、session信息、cookie信息,甚至完全改变响应消息等。
如何实现这样的需求,同时保持可扩展性、可重用性、可配置、移植性?

问题解决

要实现这种需求,最直观的方法就是在每个请求处理过程中添加所有这些逻辑,为了减少代码重复,可以将所有这些检查提取成方法,这样在每个处理方法中调用即可:
public response service1(request request) {
    validate(request);
    request 
= transform(request);
    response response 
= process1(request);
    
return transform(response);
}
此时,如果出现service2方法,依然需要拷贝service1中的实现,然后将process1换成process2即可。这个时候我们发现很多重复代码,继续对它重构,比如提取公共逻辑到基类成模版方法,这种使用继承的方式会引起子类对父类的耦合,如果要让某些模块变的可配置需要有太多的判断逻辑,代码变的臃肿;因而可以更进一步,将所有处理逻辑抽象出一个processor接口,然后使用decorate模式(即引用优于继承):
public interface processor {
    response process(request request);
}
public class coreprocessor implements processor {
    
public response process(request request) {
        
// do process/calculation
    }
}
public class decoratedprocessor implements processor {
    
private final processor innerprocessor;
    
public decoratedprocessor(processor processor) {
        
this.innerprocessor = processor;
    }

    
public response process(request request) {
        request 
= preprocess(request);
        response response 
= innerprocessor.process(request);
        response 
= postprocess(response);
        
return response;
    }

    
protected request preprocess(request request) {
        
return request;
    }
    
protected response postprocess(response response) {
        
return response;
    }
}

public void transformer extends decoratedprocessor {
    
public transformer(processor processor) {
        
super(processor);
    }

    
protected request preprocess(request request) {
        
return transformrequest(request);
    }
    
protected response postprocess(response response) {
        
return transformresponse(response);
    }
}
此时,如果需要在真正的处理逻辑之前加入其他的预处理逻辑,只需要继承decoratedprocessor,实现preprocess或postprocess方法,分别在请求处理之前和请求处理之后横向切入一些逻辑,也就是所谓的aop编程:面向切面的编程,然后只需要根据需求构建这个链条:
processor processor = new missingexceptioncatcher(new debugger(new transformer(new coreprocessor());
response response 
= processor.process(request);
......
这已经是相对比较好的设计了,每个processor只需要关注自己的实现逻辑即可,代码变的简洁;并且每个processor各自独立,可重用性好,测试方便;整条链上能实现的功能只是取决于链的构造,因而只需要有一种方法配置链的构造即可,可配置性也变得灵活;然而很多时候引用是一种静态的依赖,而无法满足动态的需求。要构造这条链,每个前置processor需要知道其后的processor,这在某些情况下并不是在起初就知道的。此时,我们需要引入intercepting filter模式来实现动态的改变条链。

intercepting filter模式

在前文已经构建了一条由引用而成的processor链,然而这是一条静态链,并且需要一开始就能构造出这条链,为了解决这个限制,我们可以引入一个processorchain来维护这条链,并且这条链可以动态的构建。

有多种方式可以实现并控制这个链:
  1. 在存储上,可以使用数组来存储所有的processor,processor在数组中的位置表示这个processor在链条中的位置;也可以用链表来存储所有的processor,此时processor在这个链表中的位置即是在链中的位置。
  2. 在抽象上,可以所有的逻辑都封装在processor中,也可以将核心逻辑使用processor抽象,而外围逻辑使用filter抽象。
  3. 在流程控制上,一般通过在processor实现方法中直接使用processorchain实例(通过参数掺入)来控制流程,利用方法调用的进栈出栈的特性实现preprocess()和postprocess()处理。
在实际中使用这个模式的有:servlet的filter机制、netty的channelpipeline中、structs2中的interceptor中都实现了这个模式。

intercepting filter模式在servlet的filter中的实现(jetty版本)

其中servlet的filter在jetty的实现中使用数组存储filter,filter末尾可以使用servlet实例处理真正的业务逻辑,在流程控制上,使用filterchain的dofilter方法来实现。如filterchain在jetty中的实现:
public void dofilter(servletrequest request, servletresponse response) throws ioexception, servletexception
   
// pass to next filter
    if (_filter < lazylist.size(_chain)) {
        filterholder holder
= (filterholder)lazylist.get(_chain, _filter);
        filter filter= holder.getfilter();
        filter.dofilter(request, response, this);                   
       
return;
    }

   
// call servlet
    httpservletrequest srequest = (httpservletrequest)request;
   
if (_servletholder != null) {
        _servletholder.handle(_baserequest,request, response);

    }
}
这里,_chain实际上是一个filter的arraylist,由filterchain调用dofilter()启动调用第一个filter的dofilter()方法,在实际的filter实现中,需要手动的调用filterchain.dofilter()方法来启动下一个filter的调用,利用方法调用的进栈出栈的特性实现request的pre-process和response的post-process处理。如果不调用filterchain.dofilter()方法,则表示不需要调用之后的filter,流程从当前filter返回,在它之前的filter的filterchain.dofilter()调用之后的逻辑反向处理直到第一个filter处理完成而返回。
public class myfilter implements filter {
    
public void dofilter(servletrequest request, servletresponse response, filterchain chain) throws ioexception, servletexception {
        
// pre-process servletrequest
        chain.dofilter(request, response);
        
// post-process servlet response
    }
}
整个filter链的处理流程如下:

intercepting filter模式在netty3中的实现

netty3在defaultchannelpipeline中实现了intercepting filter模式,其中channelhandler是它的filter。在netty3的defaultchannelpipeline中,使用一个以channelhandlercontext为节点的双向链表来存储channelhandler,所有的横切面逻辑和实际业务逻辑都用channelhandler表达,在控制流程上使用channelhandlercontext的senddownstream()和sendupstream()方法来控制流程。不同于servlet的filter,channelhandler有两个子接口:channelupstreamhandler和channeldownstreamhandler分别用来请求进入时的处理流程和响应出去时的处理流程。对于client的请求,从defaultchannelpipeline的sendupstream()方法入口:
public void senddownstream(channelevent e) {
    defaultchannelhandlercontext tail 
= getactualdownstreamcontext(this.tail);
   
if (tail == null) {
       
try {
            getsink().eventsunk(
this, e);
           
return;
        } 
catch (throwable t) {
            notifyhandlerexception(e, t);
           
return;
        }
    }
    senddownstream(tail, e);
}
void senddownstream(defaultchannelhandlercontext ctx, channelevent e) {
   
if (e instanceof upstreammessageevent) {
       
throw new illegalargumentexception("cannot send an upstream event to downstream");
    }
   
try {
        ((channeldownstreamhandler) ctx.gethandler()).handledownstream(ctx, e)
     } 
catch (throwable t) {
        e.getfuture().setfailure(t);
        notifyhandlerexception(e, t);
    }
}
如果有响应消息,该消息从defaultchannelpipeline的senddownstream()方法为入口:
public void sendupstream(channelevent e) {
    defaultchannelhandlercontext head 
= getactualupstreamcontext(this.head);
   
if (head == null) {
        return;
    }
    sendupstream(head, e);
}
void sendupstream(defaultchannelhandlercontext ctx, channelevent e) {
   
try {
        ((channelupstreamhandler) ctx.gethandler()).handleupstream(ctx, e);
    } 
catch (throwable t) {
        notifyhandlerexception(e, t);
    }
}
在实际实现channelupstreamhandler或channeldownstreamhandler时,调用channelhandlercontext中的sendupstream或senddownstream方法将控制流程交给下一个channelupstreamhandler或下一个channeldownstreamhandler,或调用channel中的write方法发送响应消息。
public class mychannelupstreamhandler implements channelupstreamhandler {
    
public void handleupstream(channelhandlercontext ctx, channelevent e) throws exception {
        
// handle current logic, use channel to write response if needed.
        
// ctx.getchannel().write(message);
        ctx.sendupstream(e);
    }
}

public class mychanneldownstreamhandler implements channeldownstreamhandler {
    
public void handledownstream(
            channelhandlercontext ctx, channelevent e) 
throws exception {
        
// handle current logic
        ctx.senddownstream(e);
    }
}
当channelhandler向channelpipelinecontext发送事件时,其内部从当前channelpipelinecontext 节点出发找到下一个channelupstreamhandler或channeldownstreamhandler实例,并向其发送 channelevent,对于downstream链,如果到达链尾,则将channelevent发送给channelsink:
public void senddownstream(channelevent e) {
    defaultchannelhandlercontext prev 
= getactualdownstreamcontext(this.prev);
   
if (prev == null) {
       
try {
            getsink().eventsunk(defaultchannelpipeline.
this, e);
        } 
catch (throwable t) {
            notifyhandlerexception(e, t);
        }
    } 
else {
        defaultchannelpipeline.
this.senddownstream(prev, e);
    }
}

public void sendupstream(channelevent e) {
    defaultchannelhandlercontext next 
= getactualupstreamcontext(this.next);
   
if (next != null) {
        defaultchannelpipeline.
this.sendupstream(next, e);
    }
}
正是因为这个实现,如果在一个末尾的channelupstreamhandler中先移除自己,在向末尾添加一个新的channelupstreamhandler,它是无效的,因为它的next已经在调用前就固定设置为null了。

在defaultchannelpipeline的channelhandler链条的处理流程为:

在这个实现中,不像servlet的filter实现利用方法调用栈的进出栈来完成pre-process和post-process,而是在进去的链和出来的链各自调用handleupstream()和handledownstream()方法,这样会引起调用栈其实是两条链的总和,因而需要注意这条链的总长度。这样做的好处是这条channelhandler的链不依赖于方法调用栈,而是在defaultchannelpipeline内部本身的链,因而在handleupstream()或handledownstream()可以随时将执行流程转发给其他线程或线程池,只需要保留channelpipelinecontext引用,在处理完成后用这个channelpipelinecontext重新向这条链的后一个节点发送channelevent,然而由于servlet的filter依赖于方法的调用栈,因而方法返回意味着所有执行完成,这种限制在异步编程中会引起问题,因而servlet在3.0后引入了async的支持。

intercepting filter模式的缺点

简单提一下这个模式的缺点:
1. 相对传统的编程模型,这个模式有一定的学习曲线,需要很好的理解该模式后才能灵活的应用它来编程。
2. 需要划分不同的逻辑到不同的filter中,这有些时候并不是那么容易。
3. 各个filter之间共享数据将变得困难。在netty3中可以自定义自己的channelevent来实现自定义消息的传输,或者使用channelpipelinecontext的attachment字段来实现消息传输,而servlet中的filter则没有提供类似的机制,如果不是可以配置的数据在config中传递,其他时候的数据共享需要其他机制配合完成。

参考



dlevin 2015-09-03 22:14 发表评论
]]>
reactor模式详解http://www.blogjava.net/dlevin/archive/2015/09/02/427045.htmldlevindlevinwed, 02 sep 2015 07:14:00 gmthttp://www.blogjava.net/dlevin/archive/2015/09/02/427045.htmlhttp://www.blogjava.net/dlevin/comments/427045.htmlhttp://www.blogjava.net/dlevin/archive/2015/09/02/427045.html#feedback5http://www.blogjava.net/dlevin/comments/commentrss/427045.htmlhttp://www.blogjava.net/dlevin/services/trackbacks/427045.html 前记 第一次听到reactor模式是三年前的某个晚上,一个室友突然跑过来问我什么是reactor模式?我上网查了一下,很多人都是给出nio中的 selector的例子,而且就是nio里selector多路复用模型,只是给它起了一个比较fancy的名字而已,虽然它引入了eventloop概 念,这对我来说是新的概念,但是代码实现却是一样的,因而我并没有很在意这个模式。然而最近开始读netty源码,而reactor模式是很多介绍netty的文章中被大肆宣传的模式,因而我再次问自己,什么是reactor模式?本文就是对这个问题关于我的一些理解和尝试着来解答。

什么是reactor模式

要回答这个问题,首先当然是求助google或wikipedia,其中wikipedia上说:“the reactor design pattern is an event handling pattern for handling service requests delivered concurrently by one or more inputs. the service handler then demultiplexes the incoming requests and dispatches them synchronously to associated request handlers.”。从这个描述中,我们知道reactor模式首先是事件驱动的,有一个或多个并发输入源,有一个service handler,有多个request handlers;这个service handler会同步的将输入的请求(event)多路复用的分发给相应的request handler。如果用图来表达:

从结构上,这有点类似生产者消费者模式,即有一个或多个生产者将事件放入一个queue中,而一个或多个消费者主动的从这个queue中poll事件来处理;而reactor模式则并没有queue来做缓冲,每当一个event输入到service handler之后,该service handler会主动的根据不同的event类型将其分发给对应的request handler来处理。

更学术的,这篇文章()上说:“the reactor design pattern handles service requests that are delivered concurrently to an application by one or more clients. each service in an application may consistent of several methods and is represented by a separate event handler that is responsible for dispatching service-specific requests. dispatching of event handlers is performed by an initiation dispatcher, which manages the registered event handlers. demultiplexing of service requests is performed by a synchronous event demultiplexer. also known as dispatcher, notifier”。这段描述和wikipedia上的描述类似,有多个输入源,有多个不同的eventhandler(requesthandler)来处理不同的请求,initiation dispatcher用于管理eventhander,eventhandler首先要注册到initiation dispatcher中,然后initiation dispatcher根据输入的event分发给注册的eventhandler;然而initiation dispatcher并不监听event的到来,这个工作交给synchronous event demultiplexer来处理。

reactor模式结构

在解决了什么是reactor模式后,我们来看看reactor模式是由什么模块构成。图是一种比较简洁形象的表现方式,因而先上一张图来表达各个模块的名称和他们之间的关系:

handle:即操作系统中的句柄,是对资源在操作系统层面上的一种抽象,它可以是打开的文件、一个连接(socket)、timer等。由于reactor模式一般使用在网络编程中,因而这里一般指socket handle,即一个网络连接(connection,在java nio中的channel)。这个channel注册到synchronous event demultiplexer中,以监听handle中发生的事件,对serversocketchannnel可以是connect事件,对socketchannel可以是read、write、close事件等。
synchronous event demultiplexer:阻塞等待一系列的handle中的事件到来,如果阻塞等待返回,即表示在返回的handle中可以不阻塞的执行返回的事件类型。这个模块一般使用操作系统的select来实现。在java nio中用selector来封装,当selector.select()返回时,可以调用selector的selectedkeys()方法获取set,一个selectionkey表达一个有事件发生的channel以及该channel上的事件类型。上图的“synchronous event demultiplexer ---notifies--> handle”的流程如果是对的,那内部实现应该是select()方法在事件到来后会先设置handle的状态,然后返回。不了解内部实现机制,因而保留原图。
initiation dispatcher:用于管理event handler,即eventhandler的容器,用以注册、移除eventhandler等;另外,它还作为reactor模式的入口调用synchronous event demultiplexer的select方法以阻塞等待事件返回,当阻塞等待返回时,根据事件发生的handle将其分发给对应的event handler处理,即回调eventhandler中的handle_event()方法。
event handler:定义事件处理方法:handle_event(),以供initiationdispatcher回调使用。
concrete event handler:事件eventhandler接口,实现特定事件处理逻辑。

reactor模式模块之间的交互

简单描述一下reactor各个模块之间的交互流程,先从序列图开始:

1. 初始化initiationdispatcher,并初始化一个handle到eventhandler的map。
2. 注册eventhandler到initiationdispatcher中,每个eventhandler包含对相应handle的引用,从而建立handle到eventhandler的映射(map)。
3. 调用initiationdispatcher的handle_events()方法以启动event loop。在event loop中,调用select()方法(synchronous event demultiplexer)阻塞等待event发生。
4. 当某个或某些handle的event发生后,select()方法返回,initiationdispatcher根据返回的handle找到注册的eventhandler,并回调该eventhandler的handle_events()方法。
5. 在eventhandler的handle_events()方法中还可以向initiationdispatcher中注册新的eventhandler,比如对acceptoreventhandler来,当有新的client连接时,它会产生新的eventhandler以处理新的连接,并注册到initiationdispatcher中。

reactor模式实现

在中,一直以logging server来分析reactor模式,这个logging server的实现完全遵循这里对reactor描述,因而放在这里以做参考。logging server中的reactor模式实现分两个部分:client连接到logging server和client向logging server写log。因而对它的描述分成这两个步骤。
client连接到logging server

1. logging server注册loggingacceptor到initiationdispatcher。
2. logging server调用initiationdispatcher的handle_events()方法启动。
3. initiationdispatcher内部调用select()方法(synchronous event demultiplexer),阻塞等待client连接。
4. client连接到logging server。
5. initiationdisptcher中的select()方法返回,并通知loggingacceptor有新的连接到来。
6. loggingacceptor调用accept方法accept这个新连接。
7. loggingacceptor创建新的logginghandler。
8. 新的logginghandler注册到initiationdispatcher中(同时也注册到synchonous event demultiplexer中),等待client发起写log请求。
client向logging server写log

1. client发送log到logging server。
2. initiationdispatcher监测到相应的handle中有事件发生,返回阻塞等待,根据返回的handle找到logginghandler,并回调logginghandler中的handle_event()方法。
3. logginghandler中的handle_event()方法中读取handle中的log信息。
4. 将接收到的log写入到日志文件、数据库等设备中。
3.4步骤循环直到当前日志处理完成。
5. 返回到initiationdispatcher等待下一次日志写请求。

在有对reactor模式的c 的实现版本,多年不用c ,因而略过。 

java nio对reactor的实现

在java的nio中,对reactor模式有无缝的支持,即使用selector类封装了操作系统提供的synchronous event demultiplexer功能。这个doug lea已经在中有非常深入的解释了,因而不再赘述,另外对doug lea的有一些简单解释,至少它的代码格式比doug lea的ppt要整洁一些。

需要指出的是,不同这里使用initiationdispatcher来管理eventhandler,在doug lea的版本中使用selectionkey中的attachment来存储对应的eventhandler,因而不需要注册eventhandler这个步骤,或者设置attachment就是这里的注册。而且在这篇文章中,doug lea从单线程的reactor、acceptor、handler实现这个模式出发;演化为将handler中的处理逻辑多线程化,实现类似proactor模式,此时所有的io操作还是单线程的,因而再演化出一个main reactor来处理connect事件(acceptor),而多个sub reactor来处理read、write等事件(handler),这些sub reactor可以分别再自己的线程中执行,从而io操作也多线程化。这个最后一个模型正是netty中使用的模型。并且在的9.5 determine the number of initiation dispatchers in an application中也有相应的描述。

eventhandler接口定义

对eventhandler的定义有两种设计思路:single-method设计和multi-method设计:
a single-method interface:它将event封装成一个event object,eventhandler只定义一个handle_event(event event)方法。这种设计的好处是有利于扩展,可以后来方便的添加新的event类型,然而在子类的实现中,需要判断不同的event类型而再次扩展成 不同的处理方法,从这个角度上来说,它又不利于扩展。另外在netty3的使用过程中,由于它不停的创建channelevent类,因而会引起gc的不稳定。
a multi-method interface:这种设计是将不同的event类型在 eventhandler中定义相应的方法。这种设计就是netty4中使用的策略,其中一个目的是避免channelevent创建引起的gc不稳定, 另外一个好处是它可以避免在eventhandler实现时判断不同的event类型而有不同的实现,然而这种设计会给扩展新的event类型时带来非常 大的麻烦,因为它需要该接口。

关于netty4对netty3的改进可以参考:
channelhandler with no event objectin 3.x, every i/o operation created a channelevent object. for each read / write, it additionally created a new channelbuffer. it simplified the internals of netty quite a lot because it delegates resource management and buffer pooling to the jvm. however, it often was the root cause of gc pressure and uncertainty which are sometimes observed in a netty-based application under high load.

4.0 removes event object creation almost completely by replacing the event objects with strongly typed method invocations. 3.x had catch-all event handler methods such as handleupstream() and handledownstream(), but this is not the case anymore. every event type has its own handler method now:

为什么使用reactor模式

归功与netty和java nio对reactor的宣传,本文慕名而学习的reactor模式,因而已经默认reactor具有非常优秀的性能,然而慕名归慕名,到这里,我还是要不得不问自己reactor模式的好处在哪里?即为什么要使用这个reactor模式?在中是这么说的:
reactor pattern优点

separation of concerns: the reactor pattern decouples application-independent demultiplexing and dispatching mechanisms from application-specific hook method functionality. the application-independent mechanisms become reusable components that know how to demultiplex events and dispatch the appropriate hook methods defined by event handlers. in contrast, the application-specific functionality in a hook method knows how to perform a particular type of service.

improve modularity, reusability, and configurability of event-driven applications: the pattern decouples application functionality into separate classes. for instance, there are two separate classes in the logging server: one for establishing connections and another for receiving and processing logging records. this decoupling enables the reuse of the connection establishment class for different types of connection-oriented services (such as file transfer, remote login, and video-on-demand). therefore, modifying or extending the functionality of the logging server only affects the implementation of the logging handler class.

improves application portability: the initiation dispatcher’s interface can be reused independently of the os system calls that perform event demultiplexing. these system calls detect and report the occurrence of one or more events that may occur simultaneously on multiple sources of events. common sources of events may in- clude i/o handles, timers, and synchronization objects. on unix platforms, the event demultiplexing system calls are called select and poll [1]. in the win32 api [16], the waitformultipleobjects system call performs event demultiplexing.

provides coarse-grained concurrency control: the reactor pattern serializes the invocation of event handlers at the level of event demultiplexing and dispatching within a process or thread. serialization at the initiation dispatcher level often eliminates the need for more complicated synchronization or locking within an application process.

这些貌似是很多模式的共性:解耦、提升复用性、模块化、可移植性、事件驱动、细力度的并发控制等,因而并不能很好的说明什么,特别是它鼓吹的对性能的提升,这里并没有体现出来。当然在这篇文章的开头有描述过另一种直观的实现:thread-per-connection,即传统的实现,提到了这个传统实现的以下问题:
thread per connection缺点

efficiency: threading may lead to poor performance due to context switching, synchronization, and data movement [2];

programming simplicity: threading may require complex concurrency control schemes;

portability: threading is not available on all os platforms.
对于性能,它其实就是第一点关于efficiency的描述,即线程的切换、同步、数据的移动会引起性能问题。也就是说从性能的角度上,它最大的提升就是减少了性能的使用,即不需要每个client对应一个线程。我的理解,其他业务逻辑处理很多时候也会用到相同的线程,io读写操作相对cpu的操作还是要慢很多,即使reactor机制中每次读写已经能保证非阻塞读写,这里可以减少一些线程的使用,但是这减少的线程使用对性能有那么大的影响吗?答案貌似是肯定的,这篇论文()对随着线程的增长带来性能降低做了一个统计:

在这个统计中,每个线程从磁盘中读8kb数据,每个线程读同一个文件,因而数据本身是缓存在操作系统内部的,即减少io的影响;所有线程是事先分配的,不会有线程启动的影响;所有任务在测试内部产生,因而不会有网络的影响。该统计数据运行环境:linux 2.2.14,2gb内存,4-way 500mhz pentium iii。从图中可以看出,随着线程的增长,吞吐量在线程数为8个左右的时候开始线性下降,并且到64个以后而迅速下降,其相应事件也在线程达到256个后指数上升。即1 1<2,因为线程切换、同步、数据移动会有性能损失,线程数增加到一定数量时,这种性能影响效果会更加明显。

对于这点,还可以参考,用以描述同时有10k个client发起连接的问题,到2010年的时候已经出现10m problem了。

当然也有人说:.在不久的将来可能又会发生不同的变化,或者这个变化正在、已经发生着?没有做过比较仔细的测试,因而不敢随便断言什么,然而本人观点,即使线程变的影响并没有以前那么大,使用reactor模式,甚至时seda模式来减少线程的使用,再加上其他解耦、模块化、提升复用性等优点,还是值得使用的。

reactor模式的缺点

reactor模式的缺点貌似也是显而易见的:
1. 相比传统的简单模型,reactor增加了一定的复杂性,因而有一定的门槛,并且不易于调试。
2. reactor模式需要底层的synchronous event demultiplexer支持,比如java中的selector支持,操作系统的select系统调用支持,如果要自己实现synchronous event demultiplexer可能不会有那么高效。
3. reactor模式在io读写数据时还是在同一个线程中实现的,即使使用多个reactor机制的情况下,那些共享一个reactor的channel如果出现一个长时间的数据读写,会影响这个reactor中其他channel的相应时间,比如在大文件传输时,io操作就会影响其他client的相应时间,因而对这种操作,使用传统的thread-per-connection或许是一个更好的选择,或则此时使用proactor模式。

参考







dlevin 2015-09-02 15:14 发表评论
]]>
深入hbase架构解析(二)http://www.blogjava.net/dlevin/archive/2015/08/22/426950.htmldlevindlevinsat, 22 aug 2015 11:40:00 gmthttp://www.blogjava.net/dlevin/archive/2015/08/22/426950.htmlhttp://www.blogjava.net/dlevin/comments/426950.htmlhttp://www.blogjava.net/dlevin/archive/2015/08/22/426950.html#feedback0http://www.blogjava.net/dlevin/comments/commentrss/426950.htmlhttp://www.blogjava.net/dlevin/services/trackbacks/426950.html 前言这是《深入hbase架构解析(一)》的续,不多废话,继续。。。。

hbase读的实现

通过前文的描述,我们知道在hbase写时,相同cell(rowkey/columnfamily/column相同)并不保证在一起,甚至删除一个cell也只是写入一个新的cell,它含有delete标记,而不一定将一个cell真正删除了,因而这就引起了一个问题,如何实现读的问题?要解决这个问题,我们先来分析一下相同的cell可能存在的位置:首先对新写入的cell,它会存在于memstore中;然后对之前已经flush到hdfs中的cell,它会存在于某个或某些storefile(hfile)中;最后,对刚读取过的cell,它可能存在于blockcache中。既然相同的cell可能存储在三个地方,在读取的时候只需要扫瞄这三个地方,然后将结果合并即可(merge read),在hbase中扫瞄的顺序依次是:blockcache、memstore、storefile(hfile)。其中storefile的扫瞄先会使用bloom filter过滤那些不可能符合条件的hfile,然后使用block index快速定位cell,并将其加载到blockcache中,然后从blockcache中读取。我们知道一个hstore可能存在多个storefile(hfile),此时需要扫瞄多个hfile,如果hfile过多又是会引起性能问题。

compaction

memstore每次flush会创建新的hfile,而过多的hfile会引起读的性能问题,那么如何解决这个问题呢?hbase采用compaction机制来解决这个问题,有点类似java中的gc机制,起初java不停的申请内存而不释放,增加性能,然而天下没有免费的午餐,最终我们还是要在某个条件下去收集垃圾,很多时候需要stop-the-world,这种stop-the-world有些时候也会引起很大的问题,比如参考本人写的这篇文章,因而设计是一种权衡,没有完美的。还是类似java中的gc,在hbase中compaction分为两种:minor compaction和major compaction。
  1. minor compaction是指选取一些小的、相邻的storefile将他们合并成一个更大的storefile,在这个过程中不会处理已经deleted或expired的cell。一次minor compaction的结果是更少并且更大的storefile。(这个是对的吗?bigtable中是这样描述minor compaction的:as write operations execute, the size of the memtable in- creases. when the memtable size reaches a threshold, the memtable is frozen, a new memtable is created, and the frozen memtable is converted to an sstable and written to gfs. this minor compaction process has two goals: it shrinks the memory usage of the tablet server, and it reduces the amount of data that has to be read from the commit log during recovery if this server dies. incom- ing read and write operations can continue while com- pactions occur. 也就是说它将memtable的数据flush的一个hfile/sstable称为一次minor compaction)
  2. major compaction是指将所有的storefile合并成一个storefile,在这个过程中,标记为deleted的cell会被删除,而那些已经expired的cell会被丢弃,那些已经超过最多版本数的cell会被丢弃。一次major compaction的结果是一个hstore只有一个storefile存在。major compaction可以手动或自动触发,然而由于它会引起很多的io操作而引起性能问题,因而它一般会被安排在周末、凌晨等集群比较闲的时间。
更形象一点,如下面两张图分别表示minor compaction和major compaction。

hregion split

最初,一个table只有一个hregion,随着数据写入增加,如果一个hregion到达一定的大小,就需要split成两个hregion,这个大小由hbase.hregion.max.filesize指定,默认为10gb。当split时,两个新的hregion会在同一个hregionserver中创建,它们各自包含父hregion一半的数据,当split完成后,父hregion会下线,而新的两个子hregion会向hmaster注册上线,处于负载均衡的考虑,这两个新的hregion可能会被hmaster分配到其他的hregionserver中。关于split的详细信息,可以参考这篇文章:。

hregion负载均衡

在hregion split后,两个新的hregion最初会和之前的父hregion在相同的hregionserver上,出于负载均衡的考虑,hmaster可能会将其中的一个甚至两个重新分配的其他的hregionserver中,此时会引起有些hregionserver处理的数据在其他节点上,直到下一次major compaction将数据从远端的节点移动到本地节点。


hregionserver recovery

当一台hregionserver宕机时,由于它不再发送heartbeat给zookeeper而被监测到,此时zookeeper会通知hmaster,hmaster会检测到哪台hregionserver宕机,它将宕机的hregionserver中的hregion重新分配给其他的hregionserver,同时hmaster会把宕机的hregionserver相关的wal拆分分配给相应的hregionserver(将拆分出的wal文件写入对应的目的hregionserver的wal目录中,并并写入对应的datanode中),从而这些hregionserver可以replay分到的wal来重建memstore。


hbase架构简单总结

在nosql中,存在著名的cap理论,即consistency、availability、partition tolerance不可全得,目前市场上基本上的nosql都采用partition tolerance以实现数据得水平扩展,来处理relational database遇到的无法处理数据量太大的问题,或引起的性能问题。因而只有剩下c和a可以选择。hbase在两者之间选择了consistency,然后使用多个hmaster以及支持hregionserver的failure监控、zookeeper引入作为协调者等各种手段来解决availability问题,然而当网络的split-brain(network partition)发生时,它还是无法完全解决availability的问题。从这个角度上,cassandra选择了a,即它在网络split-brain时还是能正常写,而使用其他技术来解决consistency的问题,如读的时候触发consistency判断和处理。这是设计上的限制。

从实现上的优点:
  1. hbase采用强一致性模型,在一个写返回后,保证所有的读都读到相同的数据。
  2. 通过hregion动态split和merge实现自动扩展,并使用hdfs提供的多个数据备份功能,实现高可用性。
  3. 采用hregionserver和datanode运行在相同的服务器上实现数据的本地化,提升读写性能,并减少网络压力。
  4. 内建hregionserver的宕机自动恢复。采用wal来replay还未持久化到hdfs的数据。
  5. 可以无缝的和hadoop/mapreduce集成。
实现上的缺点:
  1. wal的replay过程可能会很慢。
  2. 灾难恢复比较复杂,也会比较慢。
  3. major compaction会引起io storm。
  4. 。。。。

参考:

https://www.mapr.com/blog/in-depth-look-hbase-architecture#.vdnsn6yp3qx
http://jimbojw.com/wiki/index.php?title=understanding_hbase_and_bigtable
http://hbase.apache.org/book.html
http://www.searchtb.com/2011/01/understanding-hbase.html
http://research.google.com/archive/bigtable-osdi06.pdf

dlevin 2015-08-22 19:40 发表评论
]]>
深入hbase架构解析(一)http://www.blogjava.net/dlevin/archive/2015/08/22/426877.htmldlevindlevinsat, 22 aug 2015 09:44:00 gmthttp://www.blogjava.net/dlevin/archive/2015/08/22/426877.htmlhttp://www.blogjava.net/dlevin/comments/426877.htmlhttp://www.blogjava.net/dlevin/archive/2015/08/22/426877.html#feedback0http://www.blogjava.net/dlevin/comments/commentrss/426877.htmlhttp://www.blogjava.net/dlevin/services/trackbacks/426877.html前记 公司内部使用的是mapr版本的hadoop生态系统,因而从mapr的凯发k8网页登录官网看到了这篇文文章:,原本想翻译全文,然而如果翻译就需要各种咬文嚼字,太麻烦,因而本文大部分使用了自己的语言,并且加入了其他资源的参考理解以及本人自己读源码时对其的理解,属于半翻译、半原创吧。

hbase架构组成

hbase采用master/slave架构搭建集群,它隶属于hadoop生态系统,由一下类型节点组成:hmaster节点、hregionserver节点、zookeeper集群,而在底层,它将数据存储于hdfs中,因而涉及到hdfs的namenode、datanode等,总体结构如下:

其中hmaster节点用于:
  1. 管理hregionserver,实现其负载均衡。
  2. 管理和分配hregion,比如在hregion split时分配新的hregion;在hregionserver退出时迁移其内的hregion到其他hregionserver上。
  3. 实现ddl操作(data definition language,namespace和table的增删改,column familiy的增删改等)。
  4. 管理namespace和table的元数据(实际存储在hdfs上)。
  5. 权限控制(acl)。
hregionserver节点用于:
  1. 存放和管理本地hregion。
  2. 读写hdfs,管理table中的数据。
  3. client直接通过hregionserver读写数据(从hmaster中获取元数据,找到rowkey所在的hregion/hregionserver后)。
zookeeper集群是协调系统,用于:
  1. 存放整个 hbase集群的元数据以及集群的状态信息。
  2. 实现hmaster主从节点的failover。
hbase client通过rpc方式和hmaster、hregionserver通信;一个hregionserver可以存放1000个hregion;底层table数据存储于hdfs中,而hregion所处理的数据尽量和数据所在的datanode在一起,实现数据的本地化;数据本地化并不是总能实现,比如在hregion移动(如因split)时,需要等下一次compact才能继续回到本地化。

本着半翻译的原则,再贴一个《an in-depth look at the hbase architecture》的架构图:

这个架构图比较清晰的表达了hmaster和namenode都支持多个热备份,使用zookeeper来做协调;zookeeper并不是云般神秘,它一般由三台机器组成一个集群,内部使用paxos算法支持三台server中的一台宕机,也有使用五台机器的,此时则可以支持同时两台宕机,既少于半数的宕机,然而随着机器的增加,它的性能也会下降;regionserver和datanode一般会放在相同的server上实现数据的本地化。

hregion

hbase使用rowkey将表水平切割成多个hregion,从hmaster的角度,每个hregion都纪录了它的startkey和endkey(第一个hregion的startkey为空,最后一个hregion的endkey为空),由于rowkey是排序的,因而client可以通过hmaster快速的定位每个rowkey在哪个hregion中。hregion由hmaster分配到相应的hregionserver中,然后由hregionserver负责hregion的启动和管理,和client的通信,负责数据的读(使用hdfs)。每个hregionserver可以同时管理1000个左右的hregion(这个数字怎么来的?没有从代码中看到限制,难道是出于经验?超过1000个会引起性能问题?来回答这个问题:感觉这个1000的数字是从bigtable的论文中来的(5 implementation节):each tablet server manages a set of tablets(typically we have somewhere between ten to a thousand tablets per tablet server))。

hmaster

hmaster没有单点故障问题,可以启动多个hmaster,通过zookeeper的master election机制保证同时只有一个hmaster出于active状态,其他的hmaster则处于热备份状态。一般情况下会启动两个hmaster,非active的hmaster会定期的和active hmaster通信以获取其最新状态,从而保证它是实时更新的,因而如果启动了多个hmaster反而增加了active hmaster的负担。前文已经介绍过了hmaster的主要用于hregion的分配和管理,ddl(data definition language,既table的新建、删除、修改等)的实现等,既它主要有两方面的职责:
  1. 协调hregionserver
    1. 启动时hregion的分配,以及负载均衡和修复时hregion的重新分配。
    2. 监控集群中所有hregionserver的状态(通过heartbeat和监听zookeeper中的状态)。
  2. admin职能
    1. 创建、删除、修改table的定义。

zookeeper:协调者

zookeeper为hbase集群提供协调服务,它管理着hmaster和hregionserver的状态(available/alive等),并且会在它们宕机时通知给hmaster,从而hmaster可以实现hmaster之间的failover,或对宕机的hregionserver中的hregion集合的修复(将它们分配给其他的hregionserver)。zookeeper集群本身使用一致性协议(paxos协议)保证每个节点状态的一致性。

how the components work together

zookeeper协调集群所有节点的共享信息,在hmaster和hregionserver连接到zookeeper后创建ephemeral节点,并使用heartbeat机制维持这个节点的存活状态,如果某个ephemeral节点实效,则hmaster会收到通知,并做相应的处理。

另外,hmaster通过监听zookeeper中的ephemeral节点(默认:/hbase/rs/*)来监控hregionserver的加入和宕机。在第一个hmaster连接到zookeeper时会创建ephemeral节点(默认:/hbasae/master)来表示active的hmaster,其后加进来的hmaster则监听该ephemeral节点,如果当前active的hmaster宕机,则该节点消失,因而其他hmaster得到通知,而将自身转换成active的hmaster,在变为active的hmaster之前,它会创建在/hbase/back-masters/下创建自己的ephemeral节点。

hbase的第一次读写

在hbase 0.96以前,hbase有两个特殊的table:-root-和.meta.(如中的设计),其中-root- table的位置存储在zookeeper,它存储了.meta. table的regioninfo信息,并且它只能存在一个hregion,而.meta. table则存储了用户table的regioninfo信息,它可以被切分成多个hregion,因而对第一次访问用户table时,首先从zookeeper中读取-root- table所在hregionserver;然后从该hregionserver中根据请求的tablename,rowkey读取.meta. table所在hregionserver;最后从该hregionserver中读取.meta. table的内容而获取此次请求需要访问的hregion所在的位置,然后访问该hregionsever获取请求的数据,这需要三次请求才能找到用户table所在的位置,然后第四次请求开始获取真正的数据。当然为了提升性能,客户端会缓存-root- table位置以及-root-/.meta. table的内容。如下图所示:

可是即使客户端有缓存,在初始阶段需要三次请求才能直到用户table真正所在的位置也是性能低下的,而且真的有必要支持那么多的hregion吗?或许对google这样的公司来说是需要的,但是对一般的集群来说好像并没有这个必要。在bigtable的论文中说,每行metadata存储1kb左右数据,中等大小的tablet(hregion)在128mb左右,3层位置的schema设计可以支持2^34个tablet(hregion)。即使去掉-root- table,也还可以支持2^17(131072)个hregion, 如果每个hregion还是128mb,那就是16tb,这个貌似不够大,但是现在的hregion的最大大小都会设置的比较大,比如我们设置了2gb,此时支持的大小则变成了4pb,对一般的集群来说已经够了,因而在hbase 0.96以后去掉了-root- table,只剩下这个特殊的目录表叫做meta table(hbase:meta),它存储了集群中所有用户hregion的位置信息,而zookeeper的节点中(/hbase/meta-region-server)存储的则直接是这个meta table的位置,并且这个meta table如以前的-root- table一样是不可split的。这样,客户端在第一次访问用户table的流程就变成了:
  1. 从zookeeper(/hbase/meta-region-server)中获取hbase:meta的位置(hregionserver的位置),缓存该位置信息。
  2. 从hregionserver中查询用户table对应请求的rowkey所在的hregionserver,缓存该位置信息。
  3. 从查询到hregionserver中读取row。
从这个过程中,我们发现客户会缓存这些位置信息,然而第二步它只是缓存当前rowkey对应的hregion的位置,因而如果下一个要查的rowkey不在同一个hregion中,则需要继续查询hbase:meta所在的hregion,然而随着时间的推移,客户端缓存的位置信息越来越多,以至于不需要再次查找hbase:meta table的信息,除非某个hregion因为宕机或split被移动,此时需要重新查询并且更新缓存。

hbase:meta表

hbase:meta表存储了所有用户hregion的位置信息,它的rowkey是:tablename,regionstartkey,regionid,replicaid等,它只有info列族,这个列族包含三个列,他们分别是:info:regioninfo列是regioninfo的proto格式:regionid,tablename,startkey,endkey,offline,split,replicaid;info:server格式:hregionserver对应的server:port;info:serverstartcode格式是hregionserver的启动时间戳。

hregionserver详解

hregionserver一般和datanode在同一台机器上运行,实现数据的本地性。hregionserver包含多个hregion,由wal(hlog)、blockcache、memstore、hfile组成。
  1. wal即write ahead log,在早期版本中称为hlog,它是hdfs上的一个文件,如其名字所表示的,所有写操作都会先保证将数据写入这个log文件后,才会真正更新memstore,最后写入hfile中。采用这种模式,可以保证hregionserver宕机后,我们依然可以从该log文件中读取数据,replay所有的操作,而不至于数据丢失。这个log文件会定期roll出新的文件而删除旧的文件(那些已持久化到hfile中的log可以删除)。wal文件存储在/hbase/wals/${hregionserver_name}的目录中(在0.94之前,存储在/hbase/.logs/目录中),一般一个hregionserver只有一个wal实例,也就是说一个hregionserver的所有wal写都是串行的(就像log4j的日志写也是串行的),这当然会引起性能问题,因而在hbase 1.0之后,通过实现了多个wal并行写(multiwal),该实现采用hdfs的多个管道写,以单个hregion为单位。关于wal可以参考wikipedia的。顺便吐槽一句,英文版的维基百科竟然能毫无压力的正常访问了,这是某个gfw的疏忽还是以后的常态?
  2. blockcache是一个读缓存,即“引用局部性”原理(也应用于cpu,,空间局部性是指cpu在某一时刻需要某个数据,那么有很大的概率在一下时刻它需要的数据在其附近;时间局部性是指某个数据在被访问过一次后,它有很大的概率在不久的将来会被再次的访问),将数据预读取到内存中,以提升读的性能。hbase中提供两种blockcache的实现:默认on-heap lrublockcache和bucketcache(通常是off-heap)。通常bucketcache的性能要差于lrublockcache,然而由于gc的影响,lrublockcache的延迟会变的不稳定,而bucketcache由于是自己管理blockcache,而不需要gc,因而它的延迟通常比较稳定,这也是有些时候需要选用bucketcache的原因。这篇文章对on-heap和off-heap的blockcache做了详细的比较。
  3. hregion是一个table中的一个region在一个hregionserver中的表达。一个table可以有一个或多个region,他们可以在一个相同的hregionserver上,也可以分布在不同的hregionserver上,一个hregionserver可以有多个hregion,他们分别属于不同的table。hregion由多个store(hstore)构成,每个hstore对应了一个table在这个hregion中的一个column family,即每个column family就是一个集中的存储单元,因而最好将具有相近io特性的column存储在一个column family,以实现高效读取(数据局部性原理,可以提高缓存的命中率)。hstore是hbase中存储的核心,它实现了读写hdfs功能,一个hstore由一个memstore 和0个或多个storefile组成。
    1. memstore是一个写缓存(in memory sorted buffer),所有数据的写在完成wal日志写后,会 写入memstore中,由memstore根据一定的算法将数据flush到地层hdfs文件中(hfile),通常每个hregion中的每个 column family有一个自己的memstore。
    2. hfile(storefile) 用于存储hbase的数据(cell/keyvalue)。在hfile中的数据是按rowkey、column family、column排序,对相同的cell(即这三个值都一样),则按timestamp倒序排列。

虽然上面这张图展现的是最新的hregionserver的架构(但是并不是那么的精确),但是我一直比较喜欢看以下这张图,即使它展现的应该是0.94以前的架构。

hregionserver中数据写流程图解

当客户端发起一个put请求时,首先它从hbase:meta表中查出该put数据最终需要去的hregionserver。然后客户端将put请求发送给相应的hregionserver,在hregionserver中它首先会将该put操作写入wal日志文件中(flush到磁盘中)。

写完wal日志文件后,hregionserver根据put中的tablename和rowkey找到对应的hregion,并根据column family找到对应的hstore,并将put写入到该hstore的memstore中。此时写成功,并返回通知客户端。

memstore flush

memstore是一个in memory sorted buffer,在每个hstore中都有一个memstore,即它是一个hregion的一个column family对应一个实例。它的排列顺序以rowkey、column family、column的顺序以及timestamp的倒序,如下所示:

每一次put/delete请求都是先写入到memstore中,当memstore满后会flush成一个新的storefile(底层实现是hfile),即一个hstore(column family)可以有0个或多个storefile(hfile)。有以下三种情况可以触发memstore的flush动作,需要注意的是memstore的最小flush单元是hregion而不是单个memstore。据说这是column family有个数限制的其中一个原因,估计是因为太多的column family一起flush会引起性能问题?具体原因有待考证。
  1. 当一个hregion中的所有memstore的大小总和超过了hbase.hregion.memstore.flush.size的大小,默认128mb。此时当前的hregion中所有的memstore会flush到hdfs中。
  2. 当全局memstore的大小超过了hbase.regionserver.global.memstore.upperlimit的大小,默认40%的内存使用量。此时当前hregionserver中所有hregion中的memstore都会flush到hdfs中,flush顺序是memstore大小的倒序(一个hregion中所有memstore总和作为该hregion的memstore的大小还是选取最大的memstore作为参考?有待考证),直到总体的memstore使用量低于hbase.regionserver.global.memstore.lowerlimit,默认38%的内存使用量。
  3. 当前hregionserver中wal的大小超过了hbase.regionserver.hlog.blocksize * hbase.regionserver.max.logs的数量,当前hregionserver中所有hregion中的memstore都会flush到hdfs中,flush使用时间顺序,最早的memstore先flush直到wal的数量少于hbase.regionserver.hlog.blocksize * hbase.regionserver.max.logs。说这两个相乘的默认大小是2gb,查代码,hbase.regionserver.max.logs默认值是32,而hbase.regionserver.hlog.blocksize是hdfs的默认blocksize,32mb。但不管怎么样,因为这个大小超过限制引起的flush不是一件好事,可能引起长时间的延迟,因而这篇文章给的建议:“hint: keep hbase.regionserver.hlog.blocksize * hbase.regionserver.maxlogs just a bit above hbase.regionserver.global.memstore.lowerlimit * hbase_heapsize.”。并且需要注意,给的描述是有错的(虽然它是官方的文档)。
在memstore flush过程中,还会在尾部追加一些meta数据,其中就包括flush时最大的wal sequence值,以告诉hbase这个storefile写入的最新数据的序列,那么在recover时就直到从哪里开始。在hregion启动时,这个sequence会被读取,并取最大的作为下一次更新时的起始sequence。

hfile格式

hbase的数据以keyvalue(cell)的形式顺序的存储在hfile中,在memstore的flush过程中生成hfile,由于memstore中存储的cell遵循相同的排列顺序,因而flush过程是顺序写,我们直到磁盘的顺序写性能很高,因为不需要不停的移动磁盘指针。

hfile参考bigtable的sstable和hadoop的实现,从hbase开始到现在,hfile经历了三个版本,其中v2在0.92引入,v3在0.98引入。首先我们来看一下v1的格式:

v1的hfile由多个data block、meta block、fileinfo、data index、meta index、trailer组成,其中data block是hbase的最小存储单元,在前文中提到的blockcache就是基于data block的缓存的。一个data block由一个魔数和一系列的keyvalue(cell)组成,魔数是一个随机的数字,用于表示这是一个data block类型,以快速监测这个data block的格式,防止数据的破坏。data block的大小可以在创建column family时设置(hcolumndescriptor.setblocksize()),默认值是64kb,大号的block有利于顺序scan,小号block利于随机查询,因而需要权衡。meta块是可选的,fileinfo是固定长度的块,它纪录了文件的一些meta信息,例如:avg_key_len, avg_value_len, last_key, comparator, max_seq_id_key等。data index和meta index纪录了每个data块和meta块的其实点、未压缩时大小、key(起始rowkey?)等。trailer纪录了fileinfo、data index、meta index块的起始位置,data index和meta index索引的数量等。其中fileinfo和trailer是固定长度的。

hfile里面的每个keyvalue对就是一个简单的byte数组。但是这个byte数组里面包含了很多项,并且有固定的结构。我们来看看里面的具体结构:

开始是两个固定长度的数值,分别表示key的长度和value的长度。紧接着是key,开始是固定长度的数值,表示rowkey的长度,紧接着是 rowkey,然后是固定长度的数值,表示family的长度,然后是family,接着是qualifier,然后是两个固定长度的数值,表示time stamp和key type(put/delete)。value部分没有这么复杂的结构,就是纯粹的二进制数据了。随着hfile版本迁移,keyvalue(cell)的格式并未发生太多变化,只是在v3版本,尾部添加了一个可选的tag数组

hfilev1版本的在实际使用过程中发现它占用内存多,并且bloom file和block index会变的很大,而引起启动时间变长。其中每个hfile的bloom filter可以增长到100mb,这在查询时会引起性能问题,因为每次查询时需要加载并查询bloom filter,100mb的bloom filer会引起很大的延迟;另一个,block index在一个hregionserver可能会增长到总共6gb,hregionserver在启动时需要先加载所有这些block index,因而增加了启动时间。为了解决这些问题,在0.92版本中引入hfilev2版本:

在这个版本中,block index和bloom filter添加到了data block中间,而这种设计同时也减少了写的内存使用量;另外,为了提升启动速度,在这个版本中还引入了延迟读的功能,即在hfile真正被使用时才对其进行解析。

filev3版本基本和v2版本相比,并没有太大的改变,它在keyvalue(cell)层面上添加了tag数组的支持;并在fileinfo结构中添加了和tag相关的两个字段。关于具体hfile格式演化介绍,可以参考。

对hfilev2格式具体分析,它是一个多层的类b 树索引,采用这种设计,可以实现查找不需要读取整个文件:

data block中的cell都是升序排列,每个block都有它自己的leaf-index,每个block的最后一个key被放入intermediate-index中,root-index指向intermediate-index。在hfile的末尾还有bloom filter用于快速定位那么没有在某个data block中的row;timerange信息用于给那些使用时间查询的参考。在hfile打开时,这些索引信息都被加载并保存在内存中,以增加以后的读取性能。

这篇就先写到这里,未完待续。。。。

参考:

https://www.mapr.com/blog/in-depth-look-hbase-architecture#.vdnsn6yp3qx
http://jimbojw.com/wiki/index.php?title=understanding_hbase_and_bigtable
http://hbase.apache.org/book.html
http://www.searchtb.com/2011/01/understanding-hbase.html
http://research.google.com/archive/bigtable-osdi06.pdf

dlevin 2015-08-22 17:44 发表评论
]]>
log4j引起的程序“装死”http://www.blogjava.net/dlevin/archive/2015/08/13/426751.htmldlevindlevinthu, 13 aug 2015 08:28:00 gmthttp://www.blogjava.net/dlevin/archive/2015/08/13/426751.htmlhttp://www.blogjava.net/dlevin/comments/426751.htmlhttp://www.blogjava.net/dlevin/archive/2015/08/13/426751.html#feedback4http://www.blogjava.net/dlevin/comments/commentrss/426751.htmlhttp://www.blogjava.net/dlevin/services/trackbacks/426751.html问题起因 依然是在使用gemfire的集群中,我们发现偶尔会出现一些gemfire的function执行特别慢,并且超过了两分钟(为了保证数据的一致性,我们在写之前需要先拿一个lock,因为不能每个key都对应一个lock,因而我们使用了guava的stripe lock(关于stripe lock可以参考这里),而且这个lock本身我们指定了2分钟的超时时间,因而如果写超过两分钟,我们就会收到exception)。这个问题其实已经困扰了我们好几年了,刚前段时间,我们发现长时间的stop-the-world gc会引起这个问题,而且这种时候很多时候会引起那个节点从集群中退出,并不是所有的这种错误都有gc的问题,我特地查了gc的日志,有些这种写超过两分钟的情况下,gc一直处于非常健康的状态,而且查了gemfire的日志和我们自己的日志,也没有发现任何异常。由于我们每个数据保留两分份拷贝,也就是说每次数据写都要写两个节点,两分钟对cpu来说可以做太多的事情,因而只有io才能在某些时候产生这种问题,在问题发生的时候也没有任何overflow数据,而且本地操作,即使对io来说2分钟也是一个非常长的时间了,因而我们只能怀疑这是写另一个节点引起的,对另一个节点,它是在同一个data center中,而且基本是在同一个chasis内部,因而它们之间小于1m的数据量通信也不太可能花去2分钟的时间,所以剩下的我们就只能怀疑网络的问题了,比如数据丢包、网络抖动、网络流量太大一起传输变慢等,但是我们没有找到任何相关的问题。所以我们很长一段时间素手无策,只能怪gemfire闭源,我们不知道这两分钟是不是gemfire自己内部在做一些不为人知的事情,因而太忙了而每来得及处理我们的写请求。虽然我一直觉得不管在处理什么炒作,两分钟都没有响应根本无法解释的通,更何况gemfire节点之间并没有报告有任何异常,或者像以前发现的一个节点向locator举报另一个节点没有响应的问题,locator自己也能很正常的向那个节点发送新的成员信息(view),因而看起来向是这个节点虽然花了两分钟多来写一个数据,但是它还是有响应的,有点“假死”的赶脚。

问题发现

这个问题这么几年以来时不时的就会发生,而且因为以前花的时间太多了,而且也没有找到任何出错的地方,现在索性不去花太多时间在上面了,更何况这个它很长时间才发生一次,并且今年以来就一直没发生过,直到前几周出现一次,我有点不信邪的重新去看这个问题,依然没有找到任何可疑的地方,gc日志、应用程序日志、gemfire自己的日志、网络、cpu使用情况等所有的都是正常的,除了问题发生的那个时刻,应用程序没有任何日志,另外在问题发生之前出现过log4j日志文件的rolling(我们使用rollingfileappender,并且只保留20个日志文件),但是log4j日志文件roll的日志出现了断结,在开始要roll到真正完成roll中间还有几行gemfire自身的日志,此时我并没有觉得这个是有很大问题的,因为我始终觉得log4j除了它自己提到平均对性能有10%的影响以外,它就是一个简单的把日志写到文件的过程,不会影响的整个应用程序本身,因为它太简单了,直到今天这个问题再次出现,依然没有任何其他方面的收获,所有的地方都显示正常状态,甚至我们之前发现的网卡问题今天也没有发生,然而同样是出问题的两分钟没有出现应用程序日志,日志文件roll的日志和上次类似,开始roll到结束出现gemfire日志的交叉。
最近一次发生的日志
[info 2015/08/12 01:56:07.736 bst …] clienthealthmonitor: registering client with member id …
log4j: rolling over count=20971801
log4j: maxbackupindex=20
[info 2015/08/12 01:56:12.265 bst …] clienthealthmonitor: unregistering client with member id …
……
[info 2015/08/12 01:56:23.773 bst …] clienthealthmonitor: registering client with member id …
log4j: renaming file logs/….log.19 to logs/….log.20
一周前发生的日志
[info 2015/08/04 01:43:45.761 bst …] clienthealthmonitor: registering client with member id …
log4j: rolling over count=20971665
log4j: maxbackupindex=20
……
[info 2015/08/04 01:45:25.506 bst …] clienthealthmonitor: registering client with member id …
log4j: renaming file logs/….log.19 to logs/….log.20
看似这个是一个规律(套用同事的一句话:一次发生时偶然,两次发生就是科学了)。然而此时我其实依然不太相信log4j是“凶手”,因为我一直觉得log4j是一个简单的日志输出框架,它要是出问题也只是它自己的问题,是局部的,而这个问题的出现明显是全局的,直到我突然脑子一闪而过,日志打印的操作是synchronized,也就是说在日志文件roll的时候,所有其它需要打日志的线程都要等待直到roll完成,如果这个roll过程超过了2分钟,那么就会发生我们看到的stripe lock超时,也就是发生了程序“假死”的状态。重新查看log4j打印日志的方法调用栈,它会在两个地方用synchronized,即同一个category(logger)类实例:
    public void callappenders(loggingevent event) {
        int writes = 0;
        for(category c = this; c != null; c=c.parent) {
            // protected against simultaneous call to addappender, removeappender,
            synchronized(c) {
                if(c.aai != null) {
                    writes  = c.aai.appendlooponappenders(event);
                }
                if(!c.additive) {
                    break;
                }
            }
        }
。。。
    }
以及同一个appender在doapppend时:
    public synchronized void doappend(loggingevent event) {
      。。。
      this.append(event);
    }
而roll的过程就是在append方法中,进一步分析,在下面两句话之间,他们分别花费了超过100s和超过11s的时间:
log4j: maxbackupindex=20
。。。
log4j: renaming file logs/….log.19 to logs/….log.20
而这两句之间只包含了两个file.exists(),一个file.delete(),一个file.rename()操作:
    public void rollover() {
      。。。
      if(maxbackupindex > 0) {
        // delete the oldest file, to keep windows happy.
        file = new file(filename   '.'   maxbackupindex);
        if (file.exists())
            renamesucceeded = file.delete();
        for (int i = maxbackupindex - 1; i >= 1 && renamesucceeded; i--) {
            file = new file(filename   "."   i);
            if (file.exists()) {
                target = new file(filename   '.'   (i   1));
                loglog.debug("renaming file "   file   " to "   target);
                renamesucceeded = file.renameto(target);
            }
        }
      。。。
      }
    }

nfs简单性能测试和分析

因而我对nfs的性能作了一些简单测试:
只有一个线程时,在nfs下rename性能:
1 file:                    3ms
10 files:                48ms
20 files:                114ms
相比较,在本地磁盘rename的性能:
1 file:                    1ms
3 files:                  1ms
10 files:                3ms
对nfs和本地磁盘写的性能(模拟日志,每行都会flush):

 

nfs

local

1 writer, 11m

443ms

238ms

1 writer, 101m

2793ms

992ms

10 writers, 11m

~4400ms

~950ms

10 writers, 101m

~30157ms

~5500ms


一些其他的统计:
100同时写:
create 20 files spend: 301ms
renaming 20 files spends: 333ms
delete 20 files spends: 329ms
1000同时写:
create 20 files spend: 40145ms
renaming 20 files spends: 39273ms
而在1000个同时写的过程中,重命名:
rename file: logtest1.50 take: 36434ms
rename file: logtest1.51 take: 39ms
rename file: logtest1.52 take: 34ms
也就是说在这个模拟过程中,一个文件的rename超过36s,而向我们有十几台机器同时使用相同的nfs,并且每台机器上都跑二三十个程序,如果那段时间同时有上万个的日志写,可以预计达到100s情况是可能发生的。
关于nfs性能的问题,在《构建高性能web站点》的书(330页)中也有涉及。简单的介绍,nfs由sun在1984年开发,是主流异构平台实现文件共享的首选方案。它并没有自己的传输协议,而是使用rpc(remote procedure call)协议(应用层),rpc协议默认底层基于udp传输,但是自己实现在丢包时的重传机制,而且nfs服务器采用多进程模型,默认进程为4,但是一般都会调优增加服务进程数,然而“不管怎么对nfs进行性能优化,nfs注定不适合作为i/o密集型文件共享方案,但可以作为一般用途,比如提供站点内部的资源共享,它的优势在于容易搭建,而且可以减少不必要的数据冗余。”
可以使用命令:“nfsstat -c”获取对nfs服务器的操作的简单统计,具体可以参考《构建高性能web站点》的相关章节,里面还有更详细的对nfs服务器性能的测试。

总结

从这个事件我总结了两件事情:
1. 日志的影响可能是全局性的,因而要非常小心,一个耗时的操作可能引起程序的“假死”,因而要非常小心。
2. 虽然把日志打印在nfs上,对大量的日志文件查找会方便很多,但是这是一个很耗性能的设计,特别是当大量的程序共享这个nfs的时候,因而要尽量避免。


dlevin 2015-08-13 16:28 发表评论
]]>实现自己的lock对象http://www.blogjava.net/dlevin/archive/2015/08/11/426723.htmldlevindlevinmon, 10 aug 2015 22:08:00 gmthttp://www.blogjava.net/dlevin/archive/2015/08/11/426723.htmlhttp://www.blogjava.net/dlevin/comments/426723.htmlhttp://www.blogjava.net/dlevin/archive/2015/08/11/426723.html#feedback0http://www.blogjava.net/dlevin/comments/commentrss/426723.htmlhttp://www.blogjava.net/dlevin/services/trackbacks/426723.html一直想好好学习concurrent包中的各个类的实现,然而经常看了一点就因为其他事情干扰而放下了。发现这样太不利于自己的成长了,因而最近打算潜心一件一件的完成自己想学习的东西。

对concurrent包的学习打算先从lock的实现开始,因而自然而然的就端起了abstractqueuedsynchronizer,然而要读懂这个类的源码并不是那么容易,因而我就开始问自己一个问题:如果自己要去实现这个一个lock对象,应该如何实现呢?

要实现lock对象,首先理解什么是锁?我自己从编程角度简单的理解,所谓锁对象(互斥锁)就是它能保证一次只有一个线程能进入它保护的临界区,如果有一个线程已经拿到锁对象,那么其他对象必须让权等待,而在该线程退出这个临界区时需要唤醒等待列表中的其他线程。更学术一些,中对同步机制准则的归纳(p50):
  1. 空闲让进。当无进程处于临界区时,表明临界资源处于空闲状态,应允许一个请求进入临界区的进程立即进入自己的临界区,以有效的利用临界资源。
  2. 忙则等待。当已有进程进入临界区时,表明临界资源正在被访问,因而其他试图进入临界区的进程必须等待,以保证对临界区资源的互斥访问。
  3. 有限等待。对要求访问临界资源的进程,应保证在有限时间内能进入自己的临界区,以免陷入“死等”状态。
  4. 让权等待。当进程不能进入自己的临界区时,应该释放处理机,以免进程陷入“忙等”状态。

说了那么多,其实对互斥锁很简单,只需要一个标记位,如果该标记位为0,表示没有被占用,因而直接获得锁,然后把该标记位置为1,此时其他线程发现该标记位已经是1,因而需要等待。这里对这个标记位的比较并设值必须是原子操作,而在jdk5以后提供的atomic包里的工具类可以很方便的提供这个原子操作。然而上面的四个准则应该漏了一点,即释放锁的线程(进程)和得到锁的线程(进程)应该是同一个,就像一把钥匙对应一把锁(理想的),所以一个非常简单的lock类可以这么实现:

public class spinlockv1 {
    
private final atomicinteger state = new atomicinteger(0);
    
private volatile thread owner; // 这里owner字段可能存在中间值,不可靠,因而其他线程不可以依赖这个字段的值
    
    
public void lock() {
        
while (!state.compareandset(01)) { }
        owner 
= thread.currentthread();
    }
    
    
public void unlock() {
        thread currentthread 
= thread.currentthread();
        
if (owner != currentthread || !state.compareandset(10)) {
            
throw new illegalstateexception("the lock is not owned by thread: "  currentthread);
        }
        owner 
= null;
    }
}

一个简单的测试方法:

    @test
    
public void testlockcorrectly() throws interruptedexception {
        
final int count = 100;
        thread[] threads 
= new thread[count];
        spinlockv1 lock 
= new spinlockv1();
        addrunner runner 
= new addrunner(lock);
        
for (int i = 0; i < count; i) { 
            threads[i] 
= new thread(runner, "thread-"  i);
            threads[i].start();
        }
        
        
for (int i = 0; i < count; i) {
            threads[i].join();
        }
        
        assertequals(count, runner.getstate());
    }
    
    
private static class addrunner implements runnable {
        
private final spinlockv1 lock;
        
private int state = 0;

        
public addrunner(spinlockv1 lock) {
            
this.lock = lock;
        }
        
        
public void run() {
            lock.lock();
            
try {
                quietsleep(
10);
                state
;
                system.out.println(thread.currentthread().getname() 
 ""  state);
            } 
finally {
                lock.unlock();
            }
        }
        
        
public int getstate() {
            
return state;
        }
    }

然而这个spinlock其实并不需要state这个字段,因为owner的赋值与否也是一种状态,因而可以用它作为一种互斥状态:

public class spinlockv2 {
    
private final atomicreference<thread> owner = new atomicreference<thread>(null);
    
    
public void lock() {
        
final thread currentthread = thread.currentthread();
        
while (!owner.compareandset(null, currentthread)) { }
    }
    
    
public void unlock() {
        thread currentthread 
= thread.currentthread();
        
if (!owner.compareandset(currentthread, null)) {
            
throw new illegalstateexception("the lock is not owned by thread: "  currentthread);
        }
    }
}

这在操作系统中被定义为整形信号量,然而整形信号量如果没拿到锁会一直处于“忙等”状态(没有遵循有限等待和让权等待的准则),因而这种锁也叫spin lock,在短暂的等待中它可以提升性能,因为可以减少线程的切换,concurrent包中的atomic大部分都采用这种机制实现,然而如果需要长时间的等待,“忙等”会占用不必要的cpu时间,从而性能会变的很差,这个时候就需要将没有拿到锁的线程放到等待列表中,这种方式在操作系统中也叫记录型信号量,它遵循了让权等待准则(当前没有实现有限等待准则)。在jdk6以后提供了locksupport.park()/locksupport.unpark()操作,可以将当前线程放入一个等待列表或将一个线程从这个等待列表中唤醒。然而这个park/unpark的等待列表是一个全局的等待列表,在unpartk的时候还是需要提供需要唤醒的thread对象,因而我们需要维护自己的等待列表,但是如果我们可以用jdk提供的工具类concurrentlinkedqueue,就非常容易实现,如locksupport文档中给出来的:

class fifomutex {
   
private final atomicboolean locked = new atomicboolean(false);
   
private final queue<thread> waiters = new concurrentlinkedqueue<thread>();

   
public void lock() {
     
boolean wasinterrupted = false;
     thread current 
= thread.currentthread();
     waiters.add(current);

     
// block while not first in queue or cannot acquire lock
     while (waiters.peek() != current ||!locked.compareandset(falsetrue)) {
        locksupport.park(
this);
        
if (thread.interrupted()) // ignore interrupts while waiting
          wasinterrupted = true;
     }

     waiters.remove();
     
if (wasinterrupted)          // reassert interrupt status on exit
        current.interrupt();
   }

   
public void unlock() {
     locked.set(
false);
     locksupport.unpark(waiters.peek());
   }
 }

在该代码事例中,有一个线程等待队列和锁标记字段,每次调用lock时先将当前线程放入这个等待队列中,然后拿出队列头线程对象,如果该线程对象正好是当前线程,并且成功 使用cas方式设置locked字段(这里需要两个同时满足,因为可能出现一个线程已经从队列中移除了但还没有unlock,此时另一个线程调用lock方法,此时队列头的线程就是第二个线程,然而由于第一个线程还没有unlock或者正在unlock,因而需要使用cas原子操作来判断是否要park),表示该线程竞争成功,获得锁,否则将当前线程park,这里之所以要放在 while循环中,因为park操作可能无理由返回(spuriously),如文档中给出的描述:

locksupport.park()
public static void park( blocker)
disables the current thread for thread scheduling purposes unless the permit is available.

if the permit is available then it is consumed and the call returns immediately; otherwise the current thread becomes disabled for thread scheduling purposes and lies dormant until one of three things happens:

  • some other thread invokes with the current thread as the target; or
  • some other thread the current thread; or
  • the call spuriously (that is, for no reason) returns.

this method does not report which of these caused the method to return. callers should re-check the conditions which caused the thread to park in the first place. callers may also determine, for example, the interrupt status of the thread upon return.

parameters:
blocker - the synchronization object responsible for this thread parking
since:
1.6
我在实现自己的类时就被这个“无理由返回”坑了好久。对于已经获得锁的线程,将该线程从等待队列中移除,这里由于concurrentlinkedqueue是线程安全的,因而能保证每次都是队列头的线程得到锁,因而在得到锁匙将队列头移除。unlock逻辑比较简单,只需要将locked字段打开(设置为false),唤醒(unpark)队列头的线程即可,然后该线程会继续在lock方法的while循环中继续竞争unlocked字段,并将它自己从线程队列中移除表示获得锁成功。当然安全起见,最好在unlock中加入一些验证逻辑,如解锁的线程和加锁的线程需要相同。

然而本文的目的是自己实现一个lock对象,即只使用一些基本的操作,而不使用jdk提供的atomic类和concurrentlinkedqueue。类似的首先我们也需要一个队列存放等待线程队列(公平起见,使用先进先出队列),因而先定义一个node对象用以构成这个队列:

 

    protected static class node {
        
volatile thread owner;
        
volatile node prev;
        
volatile node next;
        
        
public node(thread owner) {
            
this.owner = owner;
            
this.state = init;
        }
        
        
public node() {
            
this(thread.currentthread());
        }
    }

简单起见,队列头是一个起点的placeholder,每个调用lock的线程都先将自己竞争放入这个队列尾,每个队列头后一个线程(node)即是获得锁的线程,所以我们需要有head node字段用以快速获取队列头的后一个node,而tail node字段用来快速插入新的node,所以关键在于如何线程安全的构建这个队列,方法还是一样的,使用cas操作,即cas方法将自己设置成tail值,然后重新构建这个列表:

    protected boolean enqueue(node node) {
        
while (true) {
            
final node pretail = tail;
            node.prev 
= pretail;
            
if (compareandsettail(pretail, node)) {
                pretail.next 
= node;
                
return node.prev == head;
            }
        }
    }

在当前线程node以线程安全的方式放入这个队列后,lock实现相对就比较简单了,如果当前node是的前驱是head,该线程获得锁,否则park当前线程,处理park无理由返回的问题,因而将park放入while循环中(该实现是一个不可重入的实现):

    public void lock() {
        
// put the latest node to a queue first, then check if the it is the first node
        
// this way, the list is the only shared resource to deal with
        node node = new node();
        
if (enqueue(node)) {
            current 
= node.owner;
        } 
else {
            
while (node.prev != head) {
                locksupport.park(
this); // this may return "spuriously"!!, so put it to while
            }

            current 
= node.owner;
        }
    }

unlock的实现需要考虑多种情况,如果当前node(head.next)有后驱,那么直接unpark该后驱即可;如果没有,表示当前已经没有其他线程在等待队列中,然而在这个判断过程中可能会有其他线程进入,因而需要用cas的方式设置tail,如果设置失败,表示此时有其他线程进入,因而需要将该新进入的线程unpark从而该新进入的线程在调用park后可以立即返回(这里的cas和enqueue的cas都是对tail操作,因而能保证状态一致):

    public void unlock() {
        node curnode 
= unlockvalidate();
        node next 
= curnode.next;
        
if (next != null) {
           
head.next = next;
            next.prev 
= head;
            locksupport.unpark(next.owner);
        } 
else {
            
if (!compareandsettail(curnode, head)) {
               
while (curnode.next == null) { } // wait until the next available
                // another node queued during the time, so we have to unlock that, or else, this node can never unparked
                unlock();
            } 
else {
               
compareandsetnext(head, curnode, null); // still use cas here as the head.next may already been changed
            }
        }
    }

具体的代码和测试类可以参考查看。


其实直到自己写完这个类后才直到者其实这是一个mcs锁的变种,因而这个实现每个线程park在自身对应的node上,而由前一个线程unpark它;而abstractqueuedsynchronizer是clh锁,因为它的park由前驱状态决定,虽然它也是由前一个线程unpark它。具体可以参考。



dlevin 2015-08-11 06:08 发表评论
]]>
网站地图