文章目錄
  1. 1. Kafka 几个实现细节
    1. 1.1. 使用场景
    2. 1.2. 基本概念
      1. 1.2.1. Broker
      2. 1.2.2. topic
      3. 1.2.3. partition
      4. 1.2.4. User group
      5. 1.2.5. Offset
      6. 1.2.6. 总结
    3. 1.3. 概念介绍:生产者,消费者,消费语义
    4. 1.4. Kafka实现细节1 :Server端的日志存储
      1. 1.4.1. 问题1:发到哪个partition,谁来定?
      2. 1.4.2. 问题2:日志如何存储?
      3. 1.4.3. 问题3:如何实现日志副本&&副本策略&&同步方式
        1. 1.4.3.1. 副本问题的提出
        2. 1.4.3.2. 副本问题的集中解决方式
      4. 1.4.4. 问题6: Kafka的主挂掉的情况讨论
    5. 1.5. Kafka实现细节2:新玩法.Log Compaction
    6. 1.6. Kafka实现细节3:消费怎么保证不丢数据?
      1. 1.6.1. 问题1: Offset怎么存?
      2. 1.6.2. 问题2: Consumer如何做loadbalance?
      3. 1.6.3. 问题3: Consumer的关闭异常,会不会存在Offset异常导致多消费或者少消费?

Kafka 几个实现细节

关于Kafka大方向上的介绍已经很多了,infoq上面不少不错的资源

http://www.oschina.net/translate/kafka-design?cmp&p=1#

http://www.infoq.com/cn/articles/kafka-analysis-part-1

http://www.infoq.com/cn/articles/kafka-analysis-part-2

http://www.infoq.com/cn/articles/kafka-analysis-part-3

http://www.infoq.com/cn/articles/kafka-analysis-part-4

主要想从几个细节出发简单写一下Kafka,也为自己做一些积累。

使用场景

进程间交互的方式有很多,其中包括共享存储/进程间通信。消息队列则是进程间通信实现的一种方式。

关于消息队列的实现有很多,ActiveMQ,OpenMQ,RabbitMQ,RocketMQ。看名字也可以看出,很多MQ更多集中精力解决速度问题:『快』。

但事实上,很多MQ都有各种各样的性能问题,比如,很多MQ都主要使用内存对消息进行存储。当缓存很多消息时,有可能出现较多性能问题。

Kafka的出现在一定程度上解决了此类问题,虽然Kafka仍然是以一个消息队列的形式存在,但其实现已经远超一个简单地消息队列。

其主要特点包括

  • 使用file system进行存储
  • 支持数据多副本
  • 支持多节点负载均衡
  • 基本已经实现exactly once语义

基本概念

下面的概念中有部分逻辑概念,部分实体概念。

Broker

物理概念,指服务于Kafka的一个node。

topic

MQ中的抽象概念,是一个消费标示。用于保证Producer以及Consumer能够通过该标示进行对接。可以理解为一种Naming方式。

partition

Topic的一个子概念,一个topic可具有多个partition,但Partition一定属于一个topic。

值得注意的是:

  • 在实现上都是以每个Partition为基本实现单元的。
  • 消费时,每个消费线程最多只能使用一个partition。
  • 一个topic中partition的数量,就是每个user group中消费该topic的最大并行度数量。

User group

为了便于实现MQ中的多播,重复消费等引入的概念。如果ConsumerA以及ConsumerB同在一个UserGroup,那么ConsumerA消费的数据ConsumerB就无法消费了。

即:所有usergroup中的consumer使用一套offset。

Offset

Offset专指Partition以及User Group而言,记录某个user group在某个partiton中当前已经消费到达的位置。

总结

Kafka使用了Topic以及Partition的概念。其中Partition隶属于Topic,即topic1可以具有多个partition。而Partition则是Consumer消费的基本单元,即topic1有几个partition,那么最多就可以有多少个consumer同时在一个User Group里消费这个topic。而Offset则是记录了UserGroup在每个partiton中的偏移值。

概念介绍:生产者,消费者,消费语义

  • 生产者

生产者直接向某topic的某partition发送数据。leader负责主备策略,写入数据,发送ack。

  • 消费者

消费者使用fetch的方式拉取数据。kafkaServer不直接负责每个consumer的当前消费到了哪里,所以需要client端和zk联合维护每个partition读到了哪里,即Offset。

所以这样看上去,kafkaServer在一定程度上更像是一个大部分为顺序读取的,基于文件的日志系统。

因为简单,所以稳定。
  • 消费语义

对比其他MQ的多播,等语义,Kafka看上去略显单薄,其主要通过User Group的概念实现消费语义。而UserGroup实际对应的就是Offset的更改策略。

User1,User2同属一个userGroup时,即表示二者共用一套Offset。因每个partition 的offset只能由一个线程维护,因此注定了每个UserGroup里只能有一个消费线程对一个partition进行消费。

同样,如果希望实现多播,那就User1和User2用两个userGroup。

Kafka实现细节1 :Server端的日志存储

Kafka因为采用顺序写+无状态的方式,将可靠性发挥到了极致,使得Kafka成为了一集消息缓存以及MQ于一身的利器。首先第一个问题是搞清楚:Kafka内部存储日志的方式。

我们知道Partition是Topic的实体,所以当Producer向某topic发送数据时,需要判定几个问题。

问题1:发到哪个partition,谁来定?

这种问题没有正确的答案,只有到底在牺牲谁的答案。

在目前0.8.2.1的Kafka中,是交由Producer来解决这个问题的,Producer中有个PartitionManager专门用于负责对每个Message分配partition,或者由使用者更改。

  • 优势

这样的优势在于Kafka Server不需要单独一个LoadBalancer来决定消息去哪里。而且Producer完全可以根据partition的id在ZK里寻找当前Leader,直接与Leader建立连接。

  • 劣势

是不是看到这里发现问题了?是的,如果某个Partition完全不可用,这些消息就无法发送了。使用更加简化的模型带来的代价是牺牲了一部分可用性。

当然再有了副本策略之后,使一个partition变得不可用是一件很困难的事情。

问题2:日志如何存储?

在这里,我们先讨论单点存储结构

Kafka Producer在确定partition leader之后开始与其所在的broker通信。为了使用磁盘的顺序写,即使用Log Structure storage。

为查找方便,Kafka同样建立了基本的索引结构。想想查询需求,有什么查询需求?大部分消息都会被顺序读取,当然也会存在少量的随机读取消息(比如处理的时候这条消息处理失败,需要重新处理)。所以索引在这里的意义仅为简单支持少量随机查询。

所以在索引的实现上,基本上就是为了支持针对某个Offset进行二分查找而存在的索引。

所以在文件存储上,每个消息被写成了两部分,一部分是『消息实体』,一部分是『消息索引』。消息实体格式如下:

1
2
3
4
5
6
On-disk format of a message

message length : 4 bytes (value: 1+4+n)
"magic" value : 1 byte
crc : 4 bytes
payload : n bytes

消息索引格式如下,官网那张图是错的,你们就别看那个了。

Segement file主要用来搜索offset的时候使用,如果是顺序消费,只需要持续读文件内部内容即可。

kafka_message

问题3:如何实现日志副本&&副本策略&&同步方式

副本问题的提出

日志副本策略是可靠性的核心问题之一,其实现方式也是多种多样的。包括无主模型,通过paxos之类的协议保证消息顺序,但更简单直接的方式是使用主从结构,主决定顺序,从拷贝主的信息。

如果主不挂,从节点没有存在的意义。但主挂了时,我们需要从备份节点中选出一个主。与此同时,更重要的是:保证一致性。在这里一致性是指:

1
2
主ack了的消息,kafka切换主之后,依然可被消费。
主没有ack的消息,kafka切换主之后,依然没有被存储。

因此这里产生了一个trade off:Leader应该什么时候ack呢?

这个问题简直是分布式环境里永恒(最坑爹)的主题之一了。其引申出的本质问题是,你到底要什么?

  • 要可靠性

当然可以,leader收到消息之后,等follower 返回ok了ack,慢死。但好处是,主挂了,哪个follower都可以做主,大家数据都一样嘛

  • 要速度

当然可以,leader收到消息写入本地就ack,然后再发给follower。问题也很显而易见,最坏得情况下,有个消息leader返回ack了,但follower因为各种原因没有写入,主挂了,丢数据了。

副本问题的集中解决方式

我们来集中讨论一下几种实现方式从而明白我们需要哪种策略吧。

  • 方式1: Quorum及类似协议

Quorum直译为决议团,即通过写多份,读多次的方式保证单个值是最新值,通俗理解为抽屉原理。

抽屉原理的适用范围很广,Dynamo在某种程度上也是使用了抽屉原理

在Dynamo的使用中,设共有N副本,每次写保证W个副本ack,每次读的时候读R个副本并从中取最新值,则只要保证W+R>N,那么就一定能保证读到最新数据。但那是在Key-Value的存储中使用的,没有数据顺序问题。在Kafka里,我们还需要有一个数据顺序问题。

Kafka中会持续写入数据,主接收数据后,向所有follower发送数据。当然,因为网络问题,每次成功ack的follower可能不完全相同,但可以当有W个节点ack的时候就进行主的ack。

这样,在主挂的时候,需要R个点共同选主,因为W+R>N,所以对于每条消息,R个点里一定是有一个点是写成功的。因此通过这R个点,一定可以拼凑出来一份齐全的,和Leader一样的数据。把这些数据写入单点,即可实现选新主。

当然,这里隐含了一个协议,就是leader每次向follower发送消息的时候是附带了消息编号的,且消息编号自增。里面还有很多实现细节(such as precisely defined what makes a log more complete, ensuring log consistency during leader failure or changing the set of servers in the replica set),因为Kafka没用这种方式实现,所以也就不再复述。

最经典的情况就是R=f+1,W=f+1,N=2f+1

ISR1

这就是一个典型情况,有leader 以及F1~F4四个follower,每次写入写入leader的同时,保证至少写入f=2额外的点。所以当leader写完所有信息后如果挂掉,从F1~F4里任选三个都可以组合出所有的完整的消息。

  1. 优势:
    这个方式的优势是,在写入过程中,跳过了部分反应慢的节点。因为要求W+R>N,所以选主速度应该也还可以。

  2. 劣势:性能较差,拥有2f+1的机器只能支持最多f台机器挂掉。假设我希望支持2台机器挂掉,我就需要5台机器。使用5台机器的存储,但只能存储一台机器的容量,以及一台机器的吞吐,这显然不是一个划算的买卖。

这也就是为什么只有在保证主节点的关键信息时才会使用类似Quorom的实现方式,而对于大量的数据存储并不是使用这种方式。(Dynamo应该算个特例吧)

当然还有一些其他相对类似的实现,比如 ZooKeeper的 Zab, Raft, 以及 Viewstamped Replication。Kafka等人认为最接近他们是实现的是微软亚研院的一篇论文: PacificA(周立东老师的论文….当年就是他电面的我然后直接把我送走了….)

  • Kafka自己的ISR机制

Kafka自己使用了一种称之为In-Sync Replicas(ISR)的机制。

我们回想一下刚才Quorum实现里的问题,支持挂2台的环境需要5台机器,主要是比例太高。之前使用Quorum主要是对每个消息都做f+1的备份,即

1
以单个消息为进行备份的基本单位,进行可靠性保障

在这种情况下,为了保证每个消息的可靠,所以我们只有一个选择,那就是写够f+1份数据。因为只有这样,才能保障一个f+1份的读可以获取全部数据。其主要问题在于每次ack的机器不一样。所以,找f+1份才会保险。

但是想想,使用kafka是为了高吞吐,每个机器上数据不全显然需要多点读,但我们可不可以让节点在ack之后,自己慢慢补上自己缺失的数据呢?这样读数据的时候就可以读单点了啊!

事实上,在Quorum默认的实现方式里,节点是不再进行数据交互的。也对,输入数据量那么大,每个消息持有者都不相同,进行数据交互补充自己不含有的数据可能会带来很大的网络开销,而且存储同样是问题。难道也要学dynamo用gossip协议?想想就头疼,能不能把问题简化一下呢?

不适合直接用gossip协议的原因是消息数量太大,但不用Gossip协议必然导致多份读,这对于高吞吐的kafka是不能容忍的。是不是可以换一种思路呢?比如:

1
以一段时间而非以一个消息为基本单位,进行可靠性保障

个人认为这是ISR机制最核心的思想。

这同样是基于一个节点故障模型的假设:

1
对大多数系统而言,其正常工作状态与异常工作状态成时间段分布。

所以,如果存在一种方式,能够按照时间段进行ack,再进行gossip就会变得简单很多。因为不会有多少个gossip消息传来传去。

下面介绍ISR机制

  • 主挂掉的时候,直接从ISR里选一个当主。
  • 挂掉的主启动后,查看自己是不是主,不是主,当从。
  • ISR里记录了当前跟主一致的从节点,因此每次主收到消息后,需要等到ISR里所有机器ack了这个消息后才能对client进行ack。
  • 应该有探测机制动态的使从节点加入或者离开ISR,但文档没说。

问题6: Kafka的主挂掉的情况讨论

该挂了就挂了好了,你都把所有的点都挂了,还搞什么可靠性??

Kafka实现细节2:新玩法.Log Compaction

//TODO

Kafka实现细节3:消费怎么保证不丢数据?

Kafka的高吞吐很大程度上得益于其放弃了对消费者offset的维护,而是放由消费者自行维护。

因此在消费者看来,kafka更像是一个专业顺序存储工具,而非一个消息队列。

问题1: Offset怎么存?

在Simple Consumer看来:我不管,你爱咋才能咋存。反正你让我读哪里,我就读哪里。

同事,Kafka提供了一种可以参考的方式的Offset存储方式。如果使用High-level Consumer,则可采用这种方式。

这种方式最有意思之处,是它用producer和consumer实现了一套可靠消息的Consumer,方式如下:

  • 对于每个UserGroup,Kafka会生成一个Offset Manager用于Handle所有partition的offset。Manager实际上通过一个内建的compacted topic(叫做consumer
  • 所有Consumer都要发送其offset到offset topic。
  • 当Consumer要消费时,先去offset topic取出最新的对应消息,然后消费。

问题2: Consumer如何做loadbalance?

Consumer没法做load balance,你读数据,那就来主读,主挂了,再换别的机器。

至于说为什么不去备读?因为partition哪里都是啊,你的备虽然是备份机,但那台机器上还有别的partition在做主,也有读取压力,因此让副本也进行读取其实是毫无意义的,不会增加任何吞吐,只会导致系统变得更加复杂。

问题3: Consumer的关闭异常,会不会存在Offset异常导致多消费或者少消费?

事实上是的。每当Consumer被异常重启时,有一定几率会有一部分数据被重复消费,或者被跳过。

重复数据的数量取决于Consumer同步的频率。比如:

  • Consumer每1k条消息进行一次消息同步,consumer消费到43k时,又消费了300条,然后跪了。
  • Consumer Manager只记录到了43k。
  • Consumer重启了,读自己UserGroup里Consumer Manger在该partition最新一个消息:43k。再读一下自己UserGroup里数据量:43.3k。
  • 这时,他有两个选择:
  • 1.按照43k的offset继续读,那么之前的300条消息被重复消费了。
  • 2.按照43.3读,那么有可能那300条消息之前没消费,所以对了300条。

当然会问,会不会有Exactly Once语义?

答案是,你自己来实现吧。

只要保证consumer里留一些缓存,缓冲一批消息之后,做一个transaction:

  • 向后端发送数据的同时
  • 向producer发送汇报

即可保证Exactly Once语义。

文章目錄
  1. 1. Kafka 几个实现细节
    1. 1.1. 使用场景
    2. 1.2. 基本概念
      1. 1.2.1. Broker
      2. 1.2.2. topic
      3. 1.2.3. partition
      4. 1.2.4. User group
      5. 1.2.5. Offset
      6. 1.2.6. 总结
    3. 1.3. 概念介绍:生产者,消费者,消费语义
    4. 1.4. Kafka实现细节1 :Server端的日志存储
      1. 1.4.1. 问题1:发到哪个partition,谁来定?
      2. 1.4.2. 问题2:日志如何存储?
      3. 1.4.3. 问题3:如何实现日志副本&&副本策略&&同步方式
        1. 1.4.3.1. 副本问题的提出
        2. 1.4.3.2. 副本问题的集中解决方式
      4. 1.4.4. 问题6: Kafka的主挂掉的情况讨论
    5. 1.5. Kafka实现细节2:新玩法.Log Compaction
    6. 1.6. Kafka实现细节3:消费怎么保证不丢数据?
      1. 1.6.1. 问题1: Offset怎么存?
      2. 1.6.2. 问题2: Consumer如何做loadbalance?
      3. 1.6.3. 问题3: Consumer的关闭异常,会不会存在Offset异常导致多消费或者少消费?