本文将针对kafka性能方面进行简单分析,首先简单介绍一下kafka的架构和涉及到的名词:
1. topic:用于划分message的逻辑概念,一个topic可以分布在多个broker上。
2. partition:是kafka中横向扩展和一切并行化的基础,每个topic都至少被切分为1个partition。
3. offset:消息在partition中的编号,编号顺序不跨partition。
4. consumer:用于从broker中取出/消费message。
5. producer:用于往broker中发送/生产message。
6. replication:kafka支持以partition为单位对message进行冗余备份,每个partition都可以配置至少1个replication(当仅1个replication时即仅该partition本身)。
7. leader:每个replication集合中的partition都会选出一个唯一的leader,所有的读写请求都由leader处理。其他replicas从leader处把数据更新同步到本地,过程类似大家熟悉的mysql中的binlog同步。
8. broker:kafka中使用broker来接受producer和consumer的请求,并把message持久化到本地磁盘。每个cluster当中会选举出一个broker来担任controller,负责处理partition的leader选举,协调partition迁移等工作。
9. isr(in-sync replica):是replicas的一个子集,表示目前alive且与leader能够“catch-up”的replicas集合。由于读写都是首先落到leader上,所以一般来说通过同步机制从leader上拉取数据的replica都会和leader有一些延迟(包括了延迟时间和延迟条数两个维度),任意一个超过阈值都会把该replica踢出isr。每个partition都有它自己独立的isr。
以上几乎是我们在使用kafka的过程中可能遇到的所有名词,同时也无一不是最核心的概念或组件,感觉到从设计本身来说,kafka还是足够简洁的。这次本文围绕kafka优异的吞吐性能,逐个介绍一下其设计与实现当中所使用的各项“黑科技”。
broker
不同于redis和memcacheq等内存消息队列,kafka的设计是把所有的message都要写入速度低容量大的硬盘,以此来换取更强的存储能力。实际上,kafka使用硬盘并没有带来过多的性能损失,“规规矩矩”的抄了一条“近道”。
首先,说“规规矩矩”是因为kafka在磁盘上只做sequence i/o,由于消息系统读写的特殊性,这并不存在什么问题。关于磁盘i/o的性能,引用一组kafka官方给出的测试数据(raid-5,7200rpm):
sequence i/o: 600mb/s
random i/o: 100kb/s
所以通过只做sequence i/o的限制,规避了磁盘访问速度低下对性能可能造成的影响。
接下来我们再聊一聊kafka是如何“抄近道的”。
首先,kafka重度依赖底层操作系统提供的pagecache功能。当上层有写操作时,操作系统只是将数据写入pagecache,同时标记page属性为dirty。当读操作发生时,先从pagecache中查找,如果发生缺页才进行磁盘调度,最终返回需要的数据。实际上pagecache是把尽可能多的空闲内存都当做了磁盘缓存来使用。同时如果有其他进程申请内存,回收pagecache的代价又很小,所以现代的os都支持pagecache。
使用pagecache功能同时可以避免在jvm内部缓存数据,jvm为我们提供了强大的gc能力,同时也引入了一些问题不适用与kafka的设计。
· 如果在heap内管理缓存,jvm的gc线程会频繁扫描heap空间,带来不必要的开销。如果heap过大,执行一次full gc对系统的可用性来说将是极大的挑战。
· 所有在在jvm内的对象都不免带有一个object overhead(千万不可小视),内存的有效空间利用率会因此降低。
· 所有的in-process cache在os中都有一份同样的pagecache。所以通过将缓存只放在pagecache,可以至少让可用缓存空间翻倍。
· 如果kafka重启,所有的in-process cache都会失效,而os管理的pagecache依然可以继续使用。
pagecache还只是第一步,kafka为了进一步的优化性能还采用了sendfile技术。在解释sendfile之前,首先介绍一下传统的网络i/o操作流程,大体上分为以下4步。
1. os 从硬盘把数据读到内核区的pagecache。
2. 用户进程把数据从内核区copy到用户区。
3. 然后用户进程再把数据写入到socket,数据流入内核区的socket buffer上。
4. os 再把数据从buffer中copy到网卡的buffer上,这样完成一次发送。
整个过程共经历两次context switch,四次system call。同一份数据在内核buffer与用户buffer之间重复拷贝,效率低下。其中2、3两步没有必要,完全可以直接在内核区完成数据拷贝。这也正是sendfile所解决的问题,经过sendfile优化后,整个i/o过程就变成了下面这个样子。
通过以上的介绍不难看出,kafka的设计初衷是尽一切努力在内存中完成数据交换,无论是对外作为一整个消息系统,或是内部同底层操作系统的交互。如果producer和consumer之间生产和消费进度上配合得当,完全可以实现数据交换零i/o。这也就是我为什么说kafka使用“硬盘”并没有带来过多性能损失的原因。下面是我在生产环境中采到的一些指标。
(20 brokers, 75 partitions per broker, 110k msg/s)
此时的集群只有写,没有读操作。10m/s左右的send的流量是partition之间进行replicate而产生的。从recv和writ的速率比较可以看出,写盘是使用asynchronous batch的方式,底层os可能还会进行磁盘写顺序优化。而在有read request进来的时候分为两种情况,第一种是内存中完成数据交换。
send流量从平均10m/s增加到了到平均60m/s,而磁盘read只有不超过50kb/s。pagecache降低磁盘i/o效果非常明显。
接下来是读一些收到了一段时间,已经从内存中被换出刷写到磁盘上的老数据。
其他指标还是老样子,而磁盘read已经飚高到40 mb/s。此时全部的数据都已经是走硬盘了(对硬盘的顺序读取os层会进行prefill pagecache的优化)。依然没有任何性能问题。
tips
1. kafka官方并不建议通过broker端的log.flush.interval.messages和log.flush.interval.ms来强制写盘,认为数据的可靠性应该通过replica来保证,而强制flush数据到磁盘会对整体性能产生影响。
2. 可以通过调整/proc/sys/vm/dirty_background_ratio和/proc/sys/vm/dirty_ratio来调优性能。
a. 脏页率超过第一个指标会启动pdflush开始flush dirty pagecache。
b. 脏页率超过第二个指标会阻塞所有的写操作来进行flush。
c. 根据不同的业务需求可以适当的降低dirty_background_ratio和提高dirty_ratio。
partition
partition是kafka可以很好的横向扩展和提供高并发处理以及实现replication的基础。
扩展性方面。首先,kafka允许partition在集群内的broker之间任意移动,以此来均衡可能存在的数据倾斜问题。其次,partition支持自定义的分区算法,例如可以将同一个key的所有消息都路由到同一个partition上去。 同时leader也可以在in-sync的replica中迁移。由于针对某一个partition的所有读写请求都是只由leader来处理,所以kafka会尽量把leader均匀的分散到集群的各个节点上,以免造成网络流量过于集中。
并发方面。任意partition在某一个时刻只能被一个consumer group内的一个consumer消费(反过来一个consumer则可以同时消费多个partition),kafka非常简洁的offset机制最小化了broker和consumer之间的交互,这使kafka并不会像同类其他消息队列一样,随着下游consumer数目的增加而成比例的降低性能。此外,如果多个consumer恰巧都是消费时间序上很相近的数据,可以达到很高的pagecache命中率,因而kafka可以非常高效的支持高并发读操作,实践中基本可以达到单机网卡上限。
不过,partition的数量并不是越多越好,partition的数量越多,平均到每一个broker上的数量也就越多。考虑到broker宕机(network failure, full gc)的情况下,需要由controller来为所有宕机的broker上的所有partition重新选举leader,假设每个partition的选举消耗10ms,如果broker上有500个partition,那么在进行选举的5s的时间里,对上述partition的读写操作都会触发leadernotavailableexception。
再进一步,如果挂掉的broker是整个集群的controller,那么首先要进行的是重新任命一个broker作为controller。新任命的controller要从zookeeper上获取所有partition的meta信息,获取每个信息大概3-5ms,那么如果有10000个partition这个时间就会达到30s-50s。而且不要忘记这只是重新启动一个controller花费的时间,在这基础上还要再加上前面说的选举leader的时间 -_-!!!!!!
此外,在broker端,对producer和consumer都使用了buffer机制。其中buffer的大小是统一配置的,数量则与partition个数相同。如果partition个数过多,会导致producer和consumer的buffer内存占用过大。
tips
1. partition的数量尽量提前预分配,虽然可以在后期动态增加partition,但是会冒着可能破坏message key和partition之间对应关系的风险。
2. replica的数量不要过多,如果条件允许尽量把replica集合内的partition分别调整到不同的rack。
3. 尽一切努力保证每次停broker时都可以clean shutdown,否则问题就不仅仅是恢复服务所需时间长,还可能出现数据损坏或其他很诡异的问题。
producer
kafka的研发团队表示在0.8版本里用java重写了整个producer,据说性能有了很大提升。我还没有亲自对比试用过,这里就不做数据对比了。本文结尾的扩展阅读里提到了一套我认为比较好的对照组,有兴趣的同学可以尝试一下。
其实在producer端的优化大部分消息系统采取的方式都比较单一,无非也就化零为整、同步变异步这么几种。
kafka系统默认支持messageset,把多条message自动地打成一个group后发送出去,均摊后拉低了每次通信的rtt。而且在组织messageset的同时,还可以把数据重新排序,从爆发流式的随机写入优化成较为平稳的线性写入。
此外,还要着重介绍的一点是,producer支持end-to-end的压缩。数据在本地压缩后放到网络上传输,在broker一般不解压(除非指定要deep-iteration),直至消息被consume之后在客户端解压。
当然用户也可以选择自己在应用层上做压缩和解压的工作(毕竟kafka目前支持的压缩算法有限,只有gzip和snappy),不过这样做反而会意外的降低效率!!!! kafka的end-to-end压缩与messageset配合在一起工作效果最佳,上面的做法直接割裂了两者间联系。至于道理其实很简单,压缩算法中一条基本的原理“重复的数据量越多,压缩比越高”。无关于消息体的内容,无关于消息体的数量,大多数情况下输入数据量大一些会取得更好的压缩比。
不过kafka采用messageset也导致在可用性上一定程度的妥协。每次发送数据时,producer都是send()之后就认为已经发送出去了,但其实大多数情况下消息还在内存的messageset当中,尚未发送到网络,这时候如果producer挂掉,那就会出现丢数据的情况。
为了解决这个问题,kafka在0.8版本的设计借鉴了网络当中的ack机制。如果对性能要求较高,又能在一定程度上允许message的丢失,那就可以设置request.required.acks=0 来关闭ack,以全速发送。如果需要对发送的消息进行确认,就需要设置request.required.acks为1或-1,那么1和-1又有什么区别呢?这里又要提到前面聊的有关replica数量问题。如果配置为1,表示消息只需要被leader接收并确认即可,其他的replica可以进行异步拉取无需立即进行确认,在保证可靠性的同时又不会把效率拉得很低。如果设置为-1,表示消息要commit到该partition的isr集合中的所有replica后,才可以返回ack,消息的发送会更安全,而整个过程的延迟会随着replica的数量正比增长,这里就需要根据不同的需求做相应的优化。
tips
1. producer的线程不要配置过多,尤其是在mirror或者migration中使用的时候,会加剧目标集群partition消息乱序的情况(如果你的应用场景对消息顺序很敏感的话)。
2. 0.8版本的request.required.acks默认是0(同0.7)。
consumer
consumer端的设计大体上还算是比较常规的。
· 通过consumer group,可以支持生产者消费者和队列访问两种模式。
· consumer api分为high level和low level两种。前一种重度依赖zookeeper,所以性能差一些且不自由,但是超省心。第二种不依赖zookeeper服务,无论从自由度和性能上都有更好的表现,但是所有的异常(leader迁移、offset越界、broker宕机等)和offset的维护都需要自行处理。
· 大家可以关注下不日发布的0.9 release。开发人员又用java重写了一套consumer。把两套api合并在一起,同时去掉了对zookeeper的依赖。据说性能有大幅度提升哦~~
tips
强烈推荐使用low level api,虽然繁琐一些,但是目前只有这个api可以对error数据进行自定义处理,尤其是处理broker异常或由于unclean shutdown导致的corrupted data时,否则无法skip只能等着“坏消息”在broker上被rotate掉,在此期间该replica将会一直处于不可用状态。
扩展阅读
sendfile: https://www.ibm.com/developerworks/cn/java/j-zerocopy/
so what’s wrong with 1975 programming: https://www.varnish-cache.org/trac/wiki/architectnotes
benchmarking: https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines