Kafka 几个实现细节
更新日期:
Kafka 几个实现细节
关于Kafka大方向上的介绍已经很多了,infoq上面不少不错的资源
http://www.oschina.net/translate/kafka-design?cmp&p=1#
http://www.infoq.com/cn/articles/kafka-analysis-part-1
http://www.infoq.com/cn/articles/kafka-analysis-part-2
http://www.infoq.com/cn/articles/kafka-analysis-part-3
http://www.infoq.com/cn/articles/kafka-analysis-part-4
主要想从几个细节出发简单写一下Kafka,也为自己做一些积累。
使用场景
进程间交互的方式有很多,其中包括共享存储/进程间通信。消息队列则是进程间通信实现的一种方式。
关于消息队列的实现有很多,ActiveMQ,OpenMQ,RabbitMQ,RocketMQ。看名字也可以看出,很多MQ更多集中精力解决速度问题:『快』。
但事实上,很多MQ都有各种各样的性能问题,比如,很多MQ都主要使用内存对消息进行存储。当缓存很多消息时,有可能出现较多性能问题。
Kafka的出现在一定程度上解决了此类问题,虽然Kafka仍然是以一个消息队列的形式存在,但其实现已经远超一个简单地消息队列。
其主要特点包括
- 使用file system进行存储
- 支持数据多副本
- 支持多节点负载均衡
- 基本已经实现exactly once语义
基本概念
下面的概念中有部分逻辑概念,部分实体概念。
Broker
物理概念,指服务于Kafka的一个node。
topic
MQ中的抽象概念,是一个消费标示。用于保证Producer以及Consumer能够通过该标示进行对接。可以理解为一种Naming方式。
partition
Topic的一个子概念,一个topic可具有多个partition,但Partition一定属于一个topic。
值得注意的是:
- 在实现上都是以每个Partition为基本实现单元的。
- 消费时,每个消费线程最多只能使用一个partition。
- 一个topic中partition的数量,就是每个user group中消费该topic的最大并行度数量。
User group
为了便于实现MQ中的多播,重复消费等引入的概念。如果ConsumerA以及ConsumerB同在一个UserGroup,那么ConsumerA消费的数据ConsumerB就无法消费了。
即:所有usergroup中的consumer使用一套offset。
Offset
Offset专指Partition以及User Group而言,记录某个user group在某个partiton中当前已经消费到达的位置。
总结
Kafka使用了Topic以及Partition的概念。其中Partition隶属于Topic,即topic1可以具有多个partition。而Partition则是Consumer消费的基本单元,即topic1有几个partition,那么最多就可以有多少个consumer同时在一个User Group里消费这个topic。而Offset则是记录了UserGroup在每个partiton中的偏移值。
概念介绍:生产者,消费者,消费语义
- 生产者
生产者直接向某topic的某partition发送数据。leader负责主备策略,写入数据,发送ack。
- 消费者
消费者使用fetch的方式拉取数据。kafkaServer不直接负责每个consumer的当前消费到了哪里,所以需要client端和zk联合维护每个partition读到了哪里,即Offset。
所以这样看上去,kafkaServer在一定程度上更像是一个大部分为顺序读取的,基于文件的日志系统。
因为简单,所以稳定。
- 消费语义
对比其他MQ的多播,等语义,Kafka看上去略显单薄,其主要通过User Group的概念实现消费语义。而UserGroup实际对应的就是Offset的更改策略。
User1,User2同属一个userGroup时,即表示二者共用一套Offset。因每个partition 的offset只能由一个线程维护,因此注定了每个UserGroup里只能有一个消费线程对一个partition进行消费。
同样,如果希望实现多播,那就User1和User2用两个userGroup。
Kafka实现细节1 :Server端的日志存储
Kafka因为采用顺序写+无状态的方式,将可靠性发挥到了极致,使得Kafka成为了一集消息缓存以及MQ于一身的利器。首先第一个问题是搞清楚:Kafka内部存储日志的方式。
我们知道Partition是Topic的实体,所以当Producer向某topic发送数据时,需要判定几个问题。
问题1:发到哪个partition,谁来定?
这种问题没有正确的答案,只有到底在牺牲谁的答案。
在目前0.8.2.1的Kafka中,是交由Producer来解决这个问题的,Producer中有个PartitionManager专门用于负责对每个Message分配partition,或者由使用者更改。
- 优势
这样的优势在于Kafka Server不需要单独一个LoadBalancer来决定消息去哪里。而且Producer完全可以根据partition的id在ZK里寻找当前Leader,直接与Leader建立连接。
- 劣势
是不是看到这里发现问题了?是的,如果某个Partition完全不可用,这些消息就无法发送了。使用更加简化的模型带来的代价是牺牲了一部分可用性。
当然再有了副本策略之后,使一个partition变得不可用是一件很困难的事情。
问题2:日志如何存储?
在这里,我们先讨论单点存储结构
Kafka Producer在确定partition leader之后开始与其所在的broker通信。为了使用磁盘的顺序写,即使用Log Structure storage。
为查找方便,Kafka同样建立了基本的索引结构。想想查询需求,有什么查询需求?大部分消息都会被顺序读取,当然也会存在少量的随机读取消息(比如处理的时候这条消息处理失败,需要重新处理)。所以索引在这里的意义仅为简单支持少量随机查询。
所以在索引的实现上,基本上就是为了支持针对某个Offset进行二分查找而存在的索引。
所以在文件存储上,每个消息被写成了两部分,一部分是『消息实体』,一部分是『消息索引』。消息实体格式如下:
1 | On-disk format of a message |
消息索引格式如下,官网那张图是错的,你们就别看那个了。
Segement file主要用来搜索offset的时候使用,如果是顺序消费,只需要持续读文件内部内容即可。
问题3:如何实现日志副本&&副本策略&&同步方式
副本问题的提出
日志副本策略是可靠性的核心问题之一,其实现方式也是多种多样的。包括无主模型,通过paxos之类的协议保证消息顺序,但更简单直接的方式是使用主从结构,主决定顺序,从拷贝主的信息。
如果主不挂,从节点没有存在的意义。但主挂了时,我们需要从备份节点中选出一个主。与此同时,更重要的是:保证一致性。在这里一致性是指:
1 | 主ack了的消息,kafka切换主之后,依然可被消费。 |
因此这里产生了一个trade off:Leader应该什么时候ack呢?
这个问题简直是分布式环境里永恒(最坑爹)的主题之一了。其引申出的本质问题是,你到底要什么?
- 要可靠性
当然可以,leader收到消息之后,等follower 返回ok了ack,慢死。但好处是,主挂了,哪个follower都可以做主,大家数据都一样嘛
- 要速度
当然可以,leader收到消息写入本地就ack,然后再发给follower。问题也很显而易见,最坏得情况下,有个消息leader返回ack了,但follower因为各种原因没有写入,主挂了,丢数据了。
副本问题的集中解决方式
我们来集中讨论一下几种实现方式从而明白我们需要哪种策略吧。
- 方式1: Quorum及类似协议
Quorum直译为决议团,即通过写多份,读多次的方式保证单个值是最新值,通俗理解为抽屉原理。
抽屉原理的适用范围很广,Dynamo在某种程度上也是使用了抽屉原理
在Dynamo的使用中,设共有N副本,每次写保证W个副本ack,每次读的时候读R个副本并从中取最新值,则只要保证W+R>N,那么就一定能保证读到最新数据。但那是在Key-Value的存储中使用的,没有数据顺序问题。在Kafka里,我们还需要有一个数据顺序问题。
Kafka中会持续写入数据,主接收数据后,向所有follower发送数据。当然,因为网络问题,每次成功ack的follower可能不完全相同,但可以当有W个节点ack的时候就进行主的ack。
这样,在主挂的时候,需要R个点共同选主,因为W+R>N,所以对于每条消息,R个点里一定是有一个点是写成功的。因此通过这R个点,一定可以拼凑出来一份齐全的,和Leader一样的数据。把这些数据写入单点,即可实现选新主。
当然,这里隐含了一个协议,就是leader每次向follower发送消息的时候是附带了消息编号的,且消息编号自增。里面还有很多实现细节(such as precisely defined what makes a log more complete, ensuring log consistency during leader failure or changing the set of servers in the replica set),因为Kafka没用这种方式实现,所以也就不再复述。
最经典的情况就是R=f+1,W=f+1,N=2f+1
这就是一个典型情况,有leader 以及F1~F4四个follower,每次写入写入leader的同时,保证至少写入f=2额外的点。所以当leader写完所有信息后如果挂掉,从F1~F4里任选三个都可以组合出所有的完整的消息。
优势:
这个方式的优势是,在写入过程中,跳过了部分反应慢的节点。因为要求W+R>N,所以选主速度应该也还可以。劣势:性能较差,拥有2f+1的机器只能支持最多f台机器挂掉。假设我希望支持2台机器挂掉,我就需要5台机器。使用5台机器的存储,但只能存储一台机器的容量,以及一台机器的吞吐,这显然不是一个划算的买卖。
这也就是为什么只有在保证主节点的关键信息时才会使用类似Quorom的实现方式,而对于大量的数据存储并不是使用这种方式。(Dynamo应该算个特例吧)
当然还有一些其他相对类似的实现,比如 ZooKeeper的 Zab, Raft, 以及 Viewstamped Replication。Kafka等人认为最接近他们是实现的是微软亚研院的一篇论文: PacificA(周立东老师的论文….当年就是他电面的我然后直接把我送走了….)
- Kafka自己的ISR机制
Kafka自己使用了一种称之为In-Sync Replicas(ISR)的机制。
我们回想一下刚才Quorum实现里的问题,支持挂2台的环境需要5台机器,主要是比例太高。之前使用Quorum主要是对每个消息都做f+1的备份,即
1 | 以单个消息为进行备份的基本单位,进行可靠性保障 |
在这种情况下,为了保证每个消息的可靠,所以我们只有一个选择,那就是写够f+1份数据。因为只有这样,才能保障一个f+1份的读可以获取全部数据。其主要问题在于每次ack的机器不一样。所以,找f+1份才会保险。
但是想想,使用kafka是为了高吞吐,每个机器上数据不全显然需要多点读,但我们可不可以让节点在ack之后,自己慢慢补上自己缺失的数据呢?这样读数据的时候就可以读单点了啊!
事实上,在Quorum默认的实现方式里,节点是不再进行数据交互的。也对,输入数据量那么大,每个消息持有者都不相同,进行数据交互补充自己不含有的数据可能会带来很大的网络开销,而且存储同样是问题。难道也要学dynamo用gossip协议?想想就头疼,能不能把问题简化一下呢?
不适合直接用gossip协议的原因是消息数量太大,但不用Gossip协议必然导致多份读,这对于高吞吐的kafka是不能容忍的。是不是可以换一种思路呢?比如:
1 | 以一段时间而非以一个消息为基本单位,进行可靠性保障 |
个人认为这是ISR机制最核心的思想。
这同样是基于一个节点故障模型的假设:
1 | 对大多数系统而言,其正常工作状态与异常工作状态成时间段分布。 |
所以,如果存在一种方式,能够按照时间段进行ack,再进行gossip就会变得简单很多。因为不会有多少个gossip消息传来传去。
下面介绍ISR机制
- 主挂掉的时候,直接从ISR里选一个当主。
- 挂掉的主启动后,查看自己是不是主,不是主,当从。
- ISR里记录了当前跟主一致的从节点,因此每次主收到消息后,需要等到ISR里所有机器ack了这个消息后才能对client进行ack。
- 应该有探测机制动态的使从节点加入或者离开ISR,但文档没说。
问题6: Kafka的主挂掉的情况讨论
该挂了就挂了好了,你都把所有的点都挂了,还搞什么可靠性??
Kafka实现细节2:新玩法.Log Compaction
//TODO
Kafka实现细节3:消费怎么保证不丢数据?
Kafka的高吞吐很大程度上得益于其放弃了对消费者offset的维护,而是放由消费者自行维护。
因此在消费者看来,kafka更像是一个专业顺序存储工具,而非一个消息队列。
问题1: Offset怎么存?
在Simple Consumer看来:我不管,你爱咋才能咋存。反正你让我读哪里,我就读哪里。
同事,Kafka提供了一种可以参考的方式的Offset存储方式。如果使用High-level Consumer,则可采用这种方式。
这种方式最有意思之处,是它用producer和consumer实现了一套可靠消息的Consumer,方式如下:
- 对于每个UserGroup,Kafka会生成一个Offset Manager用于Handle所有partition的offset。Manager实际上通过一个内建的compacted topic(叫做consumer)
- 所有Consumer都要发送其offset到offset topic。
- 当Consumer要消费时,先去offset topic取出最新的对应消息,然后消费。
问题2: Consumer如何做loadbalance?
Consumer没法做load balance,你读数据,那就来主读,主挂了,再换别的机器。
至于说为什么不去备读?因为partition哪里都是啊,你的备虽然是备份机,但那台机器上还有别的partition在做主,也有读取压力,因此让副本也进行读取其实是毫无意义的,不会增加任何吞吐,只会导致系统变得更加复杂。
问题3: Consumer的关闭异常,会不会存在Offset异常导致多消费或者少消费?
事实上是的。每当Consumer被异常重启时,有一定几率会有一部分数据被重复消费,或者被跳过。
重复数据的数量取决于Consumer同步的频率。比如:
- Consumer每1k条消息进行一次消息同步,consumer消费到43k时,又消费了300条,然后跪了。
- Consumer Manager只记录到了43k。
- Consumer重启了,读自己UserGroup里Consumer Manger在该partition最新一个消息:43k。再读一下自己UserGroup里数据量:43.3k。
- 这时,他有两个选择:
- 1.按照43k的offset继续读,那么之前的300条消息被重复消费了。
- 2.按照43.3读,那么有可能那300条消息之前没消费,所以对了300条。
当然会问,会不会有Exactly Once语义?
答案是,你自己来实现吧。
只要保证consumer里留一些缓存,缓冲一批消息之后,做一个transaction:
- 向后端发送数据的同时
- 向producer发送汇报
即可保证Exactly Once语义。