消费者消费流程

消费流程:

  1. 从zk获取要消费的partition 的leader的位置 以及 offset位置
  2. 拉数据,这里拉数据是直接从broker的pagecash拉取,零拷贝(应用程序可以直接把磁盘中的数据从内核中,直接传输到socket,不用互相拷贝) ,所以很快。
  3. 如果pagecash数据不全,就会从磁盘中拉取,并发送
  4. 消费完成后,可以手动提交offset,也可以自动提交offset。

如何避免重复消费?

分析原因:

1.生产者重复提交 2.rebalence引起重复消费

超过一定时间(max.poll.interval.ms设置的值,默认5分钟)未进行poll拉取消息,则会导致客户端主动离开队列,而引发Rebalance,提交offset失败。其他消费者会从没有提交的位置消费,从而导致重复消费。

解决方案:

1.提高消费速度

  • 增加消费者
  • 多线程消费
  • 异步消费
  • 调整消费处理时间

2.幂等处理

  • 消费者设置幂等校验
  • 开启kafka幂等配置,生产者开启幂等配置,将消息生成md5,然后保存到redis中,处理新消息的时候先校验。这个尽量不要开启,消耗性能。
Kafka 线上消息积压怎么解决
  1. 消费端拿到的消息并发消耗掉
  2. 转发到一个新的队列
kafka 消息重复消费?怎么解决?
  1. 如果发送端使用了重试机制,由于网络原因没有收到发送成功的 ACK
  2. 消费者手动提交 offset,拉取一批数据,没有执行完但是服务宕机,这部分会再次拉出来执行
    解决方法:幂等处理,自动提交
Kafka 消息丢失?怎么解决?

学习 kafka 呢需要明确几个概念
生产者发送消息到 broker 中某一个 topic 的具体分区里,消费者从一个或多个分区中拉取数据进行消费

这里整理了 Kafka 的一些关键术语:

  • Producer:生产者,消息产生和发送端。
  • Broker:Kafka 实例,多个 broker 组成一个 Kafka 集群,通常一台机器部署一个 Kafka 实例,一个实例挂了不影响其他实例。
  • Consumer:消费者,拉取消息进行消费。 一个 topic 可以让若干个消费者进行消费,若干个消费者组成一个 Consumer Group 即消费组,一条消息只能被消费组中一个 Consumer 消费。
  • Topic:主题,服务端消息的逻辑存储单元。一个 topic 通常包含若干个 Partition 分区。
  • Partition:topic 的分区,分布式存储在各个 broker 中, 实现发布与订阅的负载均衡。若干个分区可以被若干个 Consumer 同时消费,达到消费者高吞吐量。一个分区拥有多个副本(Replica),这是Kafka在可靠性和可用性方面的设计,后面会重点介绍。
  • message:消息,或称日志消息,是 Kafka 服务端实际存储的数据,每一条消息都由一个 key、一个 value 以及消息时间戳 timestamp 组成。
  • offset:偏移量,分区中的消息位置,由 Kafka 自身维护,Consumer 消费时也要保存一份 offset 以维护消费过的消息位置。

消费队列呢一般用来实现 同步到异步的转换,削峰,解耦 等目标

kafka 设计特性
  • 高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
  • 高伸缩性: 每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
  • 持久性、可靠性: Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。
  • 容错性: 允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作
  • 高并发: 支持数千个客户端同时读写

Kafka 方案及其痛点

之前,我们采用 Apache Kafka 作为消息平台, 为了让业务在高峰期(晚上八点到十点)不受影响,我们根据消息业务量的大小, 分别搭建了不同的集群。对于一些业务场景的需求, 比如需要重置 offset 来消费过去几天的消息,使用 Kafka 需要停掉消费者才可以进行, 这种方式对大量在线业务非常不利,只能采用重写消息或者一些不太灵活的方式来实现, 极大降低了使用体验。

我们在使用 Kafka 集群过程中,主要遇到以下问题:

  1. Kafka 没有租户概念,需要手动维护多个集群,不方便运维。
  2. Kafka 集群扩容后需要做 Reassign Partitions,IO 消耗大。
  3. Kafka 监控体系不完善,排查问题较为繁琐。
  4. 在线业务消息重置不方便,实现起来较为麻烦,需要停掉消费组。
  5. Kafka 不支持死信队列和延迟队列。
  6. Kafka 没有官方维护和支持的 Go 语言客户端。
  7. 在 Kafka 中支持 schema,需要引入额外组件,不方便维护。

为什么选择 Pulsar

  • Pulsar 采用云原生的架构,存储和计算分离。
  • Pulsar 支持多租户,我们可以按照不同的业务线、业务小组和对应的服务级别来管理消息保存时间、持久化、堆积清除策略等,统一维护一套 Pulsar 集群。
  • Pulsar 支持灵活的水平扩容。当存储不够时,直接增加 bookie 进行扩容,不会对用户产生任何影响。
  • Pulsar 自带监控体系,broker,bookie 相关指标清晰,方便快速定位问题,给出解决方案。
  • Pulsar cursor 方便重置消息,给业务带来很好的体验。
  • Pulsar 支持死信队列和延迟队列。
  • Pulsar schema 集成在 broker 中,不需要引入单独的组件。Golang client 支持 schema,减少了维护成本。

1 什么是kafka

Kafka是分布式发布-订阅消息系统,它最初是由LinkedIn公司开发的,之后成为Apache项目的一部分,Kafka是一个分布式,可划分的,冗余备份的持久性的日志服务,它主要用于处理流式数据。

2 为什么要使用 kafka,为什么要使用消息队列

缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。 解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。 冗余:可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。 健壮性:消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。 异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

3.Kafka中的ISR、AR又代表什么?ISR的伸缩又指什么

ISR:In-Sync Replicas 副本同步队列 AR:Assigned Replicas 所有副本
ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。

4.kafka中的broker 是干什么的

broker 是消息的代理,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉取指定Topic的消息,然后进行业务处理,broker在中间起到一个代理保存消息的中转站。

5.kafka中的 zookeeper 起到什么作用,可以不用zookeeper么

zookeeper 是一个分布式的协调组件,早期版本的kafka用zk做meta信息存储,consumer的消费状态,group的管理以及 offset的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖, 但是broker依然依赖于ZK,zookeeper 在kafka中还用来选举controller 和 检测broker是否存活等等。

6.kafka follower如何与leader同步数据

Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。完全同步复制要求All Alive Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下,如果leader挂掉,会丢失数据,kafka使用ISR的方式很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,而且Leader充分利用磁盘顺序读以及send file(zero copy)机制,这样极大的提高复制性能,内部批量写磁盘,大幅减少了Follower与Leader的消息量差。

7.什么情况下一个 broker 会从 isr中踢出去

leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica),每个Partition都会有一个ISR,而且是由leader动态维护 ,如果一个follower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除 。

8.kafka 为什么那么快

  • Cache Filesystem Cache PageCache缓存
  • 顺序写 由于现代的操作系统提供了预读和写技术,磁盘的顺序写大多数情况下比随机写内存还要快。
  • Zero-copy 零拷技术减少拷贝次数
  • Batching of Messages 批量量处理。合并小的请求,然后以流的方式进行交互,直顶网络上限。
  • Pull 拉模式 使用拉模式进行消息的获取消费,与消费端处理能力相符。

9.kafka producer如何优化打入速度

  • 增加线程
  • 提高 batch.size
  • 增加更多 producer 实例
  • 增加 partition 数
  • 设置 acks=-1 时,如果延迟增大:可以增大 num.replica.fetchers(follower 同步数据的线程数)来调解;
  • 跨数据中心的传输:增加 socket 缓冲区设置以及 OS tcp 缓冲区设置。

10.kafka producer 打数据,ack 为 0, 1, -1 的时候代表啥, 设置 -1 的时候,什么情况下,leader 会认为一条消息 commit了

  1. 1(默认) 数据发送到Kafka后,经过leader成功接收消息的的确认,就算是发送成功了。在这种情况下,如果leader宕机了,则会丢失数据。
  2. 0 生产者将数据发送出去就不管了,不去等待任何返回。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
  3. -1 producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。当ISR中所有Replica都向Leader发送ACK时,leader才commit,这时候producer才能认为一个请求中的消息都commit了。

11.kafka unclean 配置代表啥,会对 spark streaming 消费有什么影响

unclean.leader.election.enable 为true的话,意味着非ISR集合的broker 也可以参与选举,这样有可能就会丢数据,spark streaming在消费过程中拿到的 end offset 会突然变小,导致 spark streaming job挂掉。如果unclean.leader.election.enable参数设置为true,就有可能发生数据丢失和数据不一致的情况,Kafka的可靠性就会降低;而如果unclean.leader.election.enable参数设置为false,Kafka的可用性就会降低。

12.如果leader crash时,ISR为空怎么办

kafka在Broker端提供了一个配置参数:unclean.leader.election,这个参数有两个值: true(默认):允许不同步副本成为leader,由于不同步副本的消息较为滞后,此时成为leader,可能会出现消息不一致的情况。 false:不允许不同步副本成为leader,此时如果发生ISR列表为空,会一直等待旧leader恢复,降低了可用性。

13.kafka的message格式是什么样的

一个Kafka的Message由一个固定长度的header和一个变长的消息体body组成 header部分由一个字节的magic(文件格式)和四个字节的CRC32(用于判断body消息体是否正常)构成。 当magic的值为1的时候,会在magic和crc32之间多一个字节的数据:attributes(保存一些相关属性, 比如是否压缩、压缩格式等等);如果magic的值为0,那么不存在attributes属性 body是由N个字节构成的一个消息体,包含了具体的key/value消息

14.kafka中consumer group 是什么概念

同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。同一个topic的数据,会广播给不同的group;同一个group中的worker,只有一个worker能拿到这个数据。换句话说,对于同一个topic,每个group都可以拿到同样的所有数据,但是数据进入group后只能被其中的一个worker消费。group内的worker可以使用多线程或多进程来实现,也可以将进程分散在多台机器上,worker的数量通常不超过partition的数量,且二者最好保持整数倍关系,因为Kafka在设计时假定了一个partition只能被一个worker消费(同一group内)。

15.Kafka中的消息是否会丢失和重复消费?

要确定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消息消费。 1、消息发送 Kafka消息发送有两种方式:同步(sync)和异步(async),默认是同步方式,可通过producer.type属性进行配置。Kafka通过配置request.required.acks属性来确认消息的生产:

  1. 0—表示不进行消息接收是否成功的确认;
  2. 1—表示当Leader接收成功时确认;
  3. -1—表示Leader和Follower都接收成功时确认;

综上所述,有6种消息生产的情况,下面分情况来分析消息丢失的场景: (1)acks=0,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等情况时,消息可能丢失; (2)acks=1、同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失; 2、消息消费 Kafka消息消费有两个consumer接口,Low-level API和High-level API:

  1. Low-level API:消费者自己维护offset等值,可以实现对Kafka的完全控制;
  2. High-level API:封装了对parition和offset的管理,使用简单;

如果使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交了新的消息offset值后,还没来得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就“_诡异_”的消失了; 解决办法: 针对消息丢失:同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功;异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态; 针对消息重复:将消息的唯一标识保存到外部介质中,每次消费时判断是否处理过即可。 消息重复消费及解决参考:https://www.javazhiyin.com/22910.html

16.为什么Kafka不支持读写分离?

在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。 Kafka 并不支持主写从读,因为主写从读有 2 个很明 显的缺点:

  • (1)数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
  • (2)延时问题。类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经 历网络→主节点内存→网络→从节点内存这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。

17.Kafka中是怎么体现消息顺序性的?

kafka每个partition中的消息在写入时都是有序的,消费时,每个partition只能被每一个group中的一个消费者消费,保证了消费时也是有序的。 整个topic不保证有序。如果为了保证topic整个有序,那么将partition调整为1.

18.消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?

offset+1

19.kafka如何实现延迟队列?

Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)。JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(nlog(n)),并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)。时间轮的应用并非Kafka独有,其应用场景还有很多,在Netty、Akka、Quartz、Zookeeper等组件中都存在时间轮的踪影。 底层使用数组实现,数组中的每个元素可以存放一个TimerTaskList对象。TimerTaskList是一个环形双向链表,在其中的链表项TimerTaskEntry中封装了真正的定时任务TimerTask. Kafka中到底是怎么推进时间的呢?Kafka中的定时器借助了JDK中的DelayQueue来协助推进时间轮。具体做法是对于每个使用到的TimerTaskList都会加入到DelayQueue中。Kafka中的TimingWheel专门用来执行插入和删除TimerTaskEntry的操作,而DelayQueue专门负责时间推进的任务。再试想一下,DelayQueue中的第一个超时任务列表的expiration为200ms,第二个超时任务为840ms,这里获取DelayQueue的队头只需要O(1)的时间复杂度。如果采用每秒定时推进,那么获取到第一个超时的任务列表时执行的200次推进中有199次属于“空推进”,而获取到第二个超时任务时有需要执行639次“空推进”,这样会无故空耗机器的性能资源,这里采用DelayQueue来辅助以少量空间换时间,从而做到了“精准推进”。Kafka中的定时器真可谓是“知人善用”,用TimingWheel做最擅长的任务添加和删除操作,而用DelayQueue做最擅长的时间推进工作,相辅相成。

Kafka集群leader选举

  1. 在kafka集群中,第一个启动的broker会在zk中创建一个临时节点/controller让自己成为控制器。其他broker启动时也会试着创建这个节点当然他们会失败,因为已经有人创建过了。那么这些节点会在控制器节点上创建zk watch对象,这样他们就可以收到这个节点变更的通知。任何时刻都确保集群中只有一个leader的存在。
  2. 如果控制器被关闭或者与zk断开连接,zk上的KB是节点马上就会消失。那么其他订阅了leader节点的broker也会收到通知随后他们会尝试让自己成为新的leader,重复第一步的操作。
  3. 如果leader完好但是别的broker离开了集群,那么leader会去确定离开的broker的分区并确认新的分区领导者(即分区副本列表里的下一个副本)。然后向所有包含该副本的follower或者observer发送请求。随后新的分区首领开始处理请求。

Kafka创建副本的2种模式——同步复制和异步复制

Kafka动态维护了一个同步状态的副本的集合(a set of In-Sync Replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息只有被这个集合中的每个节点读取并追加到日志中,才会向外部通知说“这个消息已经被提交”。

只有当消息被所有的副本加入到日志中时,才算是“committed”,只有committed的消息才会发送给consumer,这样就不用担心一旦leader down掉了消息会丢失。消息从leader复制到follower,我们可以通过决定Producer是否等待消息被提交的通知(ack)来区分同步复制和异步复制。

同步复制流程

同步复制流程:

  • producer联系zk识别leader;
  • 向leader发送消息;
  • leadr收到消息写入到本地log;
  • follower从leader pull消息;
  • follower向本地写入log;
  • follower向leader发送ack消息;
  • leader收到所有follower的ack消息;
  • leader向producer回传ack。

异步复制流程

异步复制流程: 和同步复制的区别在于,leader写入本地log之后,直接向client回传ack消息,不需要等待所有follower复制完成。 既然kafka支持副本模式,那么其中一个Broker里的挂掉,一个新的leader就能通过ISR机制推选出来,继续处理读写请求。

Kafka判断一个broker节点是否存活

依据两个条件:

  • 节点必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个节点的连接;
  • 如果节点是个follower,他必须能及时的同步leader的写操作,延时不能太久。Leader会追踪所有“同步中”的节点,一旦一个down掉了,或是卡住了,或是延时太久,leader就会把它移除。

5. 消息

5.1. 请简述一下消息的顺序
  • Kafka保证一个Partition内消息的有序性,但是并不保证多个Partition之间的数据有顺序。 每个Topic可以划分成多个分区( 每个Topic都至少有一个分区),同一Topic下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个offset,它是消息在此分区中的唯一编号,Kafka 通过offset保证消息在分区内的顺序, offset 的顺序性不跨分区,即Kafka只保证在同一个分区内的消息是有序的,同一Topic的多个分区内的消息,Kafka并不保证其顺序性
  • kafka消息有序。单分区可以。也可以使用key+多分区。保证同一个 Key 的所有消息都进入到相同的分区里面
  • 防止乱序可以通过设置max.in.flight.requests.per.connection=1来保证
5.2. 如何保证消息的有序
  • 一个生产者,发两次消息,但是网络原因,消息到达的顺序和消息发送的顺序不一致
    • 设置max.in.flight.requests.per.connection=1来保证
5.3. 消息堆积可能原因
  1. 生产速度大于消费速度
    1. 可以适当增加分区,增加consumer数量,提升消费TPS;
  2. consumer消费性能低
    1. 查一下是否有很重的消费逻辑,看看是否可以优化consumer TPS;
  3. 确保consumer端没有因为异常而导致消费hang住;
  4. 如果你使用的是消费者组,确保没有频繁地发生rebalance
5.4. 有哪些情况下会出现生产消息重复
  • 一个consumer正在消费一个分区的一条消息,还没有消费完,发生了rebalance(加入了一个consumer),从而导致这条消息没有消费成功,rebalance后,另一个consumer又把这条消息消费一遍
  • 生产者重复发送。比如说你的业务在发送消息的时候,收到了一个超时响应,这个时候你很难确定这个消息是否真的发送出去了,那么你就会考虑重试,重试就可能导致同一个消息发送了多次。
5.5. 那些情景下会造成消息漏消费
  1. 自动提交 设置offset为自动定时提交,当offset被自动定时提交时,数据还在内存中未处理,此时刚好把线程kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。
  2. 生产者发送消息 发送消息设置的是fire-and-forget(发后即忘),它只管往 Kafka 中发送消息而并不关心消息是否正确到达。不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性也最差。
  3. 消费者端 先提交位移,但是消息还没消费完就宕机了,造成了消息没有被消费。自动位移提交同理
  4. acks没有设置为all 如果在broker还没把消息同步到其他broker的时候宕机了,那么消息将会丢失
5.6. 有哪些情形会造成重复消费?
  1. Rebalance 一个consumer正在消费一个分区的一条消息,还没有消费完,发生了rebalance(加入了一个consumer),从而导致这条消息没有消费成功,rebalance后,另一个consumer又把这条消息消费一遍。
  2. 消费者端手动提交 如果先消费消息,再更新offset位置,导致消息重复消费。
  3. 消费者端自动提交 设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。
  4. 生产者端 生产者因为业务问题导致的宕机,在重启之后可能数据会重发
5.7. Kafka中是怎么体现消息顺序性的?
  • 可以通过分区策略体现消息顺序性。 分区策略有轮询策略、随机策略、按消息键保序策略。

按消息键保序策略:一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略

5.8. Kafka 中最基本的数据单元是消息 message(Kafka 中的消息理解成数据库里的一条行或者一条记录)
5.9. Kafka中的幂等是怎么实现的
  • 在 Kafka 中,Producer 默认不是幂等性的,可以创建幂等性 Producer。它其实是 0.11.0.0 版本引入的新功能。在此之前,Kafka 向分区发送数据时,可能会出现同一条消息被发送了多次,导致消息重复的情况。在 0.11 之后,指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。
  • 底层具体的原理,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了
5.10. 幂等性 Producer 作用范围
  • 它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。
5.11. 解释Kafka的用户如何消费信息?
  • 在Kafka中传递消息是通过使用sendfile API完成的。它支持将字节从套接口转移到磁盘,通过内核空间保存副本,并在内核用户之间调用内核。
5.12. Kafka中怎么做消息轨迹
  • 消息轨迹指的是一条消息从生产者发出,经由 broker 存储,再到消费者消费的整个过程中,各个相关节点的状态、时间、地点等数据汇聚而成的完整链路信息。生产者、broker、消费者这3个角色在处理消息的过程中都会在链路中增加相应的信息,将这些信息汇聚、处理之后就可以查询任意消息的状态,进而为生产环境中的故障排除提供强有力的数据支持。对消息轨迹而言,最常见的实现方式是封装客户端,在保证正常生产消费的同时添加相应的轨迹信息埋点逻辑。无论生产,还是消费,在执行之后都会有相应的轨迹信息,我们需要将这些信息保存起来。同样可以将轨迹信息保存到 Kafka 的某个主题中,比如下图中的主题 trace_topic。生产者在将消息正常发送到用户主题 real_topic 之后(或者消费者在拉取到消息消费之后)会将轨迹信息发送到主题 trace_topic 中。
5.13. Kafka为什么这么快
  • Kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache,同时标记Page属性为Dirty。当读操作发生时,先从PageCache中查找,如果发生缺页才进行磁盘调度,最终返回需要的数据。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。同时如果有其他进程申请内存,回收PageCache的代价又很小,所以现代的OS都支持PageCache。 使用PageCache功能同时可以避免在JVM内部缓存数据,JVM为我们提供了强大的GC能力,同时也引入了一些问题不适用与Kafka的设计。
5.14. kafka为什么不像MySQL那样允许追随者副本对外提供读服务
  • kafka的分区已经让读是从多个broker读从而负载均衡,不是MySQL的主从,压力都在主上
  • kafka保存的数据和数据库的性质有实质的区别就是数据具有消费的概念,是流数据,kafka是消息队列,所以消费需要位移,而数据库是实体数据不存在这个概念,如果从kafka的follower读,消费端offset控制更复杂
  • 生产者来说,kafka可以通过配置来控制是否等待follower对消息确认的,如果从上面读,也需要所有的follower都确认了才可以回复生产者,造成性能下降,如果follower出问题了也不好处理
  • 首先会存在数据一致性的问题,消息从主节点同步到从节点需要时间,可能造成主从节点的数据不一致。主写从读无非就是为了减轻leader节点的压力,将读请求的负载均衡到follower节点,如果Kafka的分区相对均匀地分散到各个broker上,同样可以达到负载均衡的效果,没必要刻意实现主写从读增加代码实现的复杂程度
5.15. Producer端,网络,数据格式等因素,会不会导致Kafka只有一个分区接收到数据顺序跟Producer发送数据顺序不一致
  • 如果retries>0并且max.in.flight.requests.per.connection>1有可能出现消息乱序的情况
5.16. replica的leader和follower之间如何复制数据保证消息的持久化的问题
  • follower副本不断地从leader处拉取消息。
  • 生产者消息发过来以后,写leader成功后即告知生产者成功,然后异步的将消息同步给其他follower,这种方式效率最高,但可能丢数据;
  • 同步等待所有follower都复制成功后通知生产者消息发送成功,这样不会丢数据,但效率不高;
5.17. 在partition增加或减少消息路由重新hash的情况下,消息的顺序性不就没有办法保证了。特别是在相同key的情况下,有状态变更顺序要求的场景。不知道对于类似场景有什么好的解决方案
  • 可以自己写个partitioner,让相同的key用于去到相同的分区
5.18. 如果长时间不消费,提交的位移会过期吗?或者提交的位移的数据被清理了,下次启动重新消费时从什么位移开始消费?
  • 提交的位移会过期。一旦被清理,从哪里消费取决于auto.offset.reset参数值
5.19. 异步发送消息,如果retry,是否会造成消息乱序?
  • 是可能的,所以有max.in.flight.requests.per.connection这个参数
5.20. 消息经常堆积起来,不能消费了。大概会有一些什么情况。如何解决
  • 生产速度大于消费速度,这样可以适当增加分区,增加consumer数量,提升消费TPS
  • consumer消费性能低,查一下是否有很重的消费逻辑(比如拿到消息后写HDFS或HBASE这种逻辑就挺重的),看看是否可以优化consumer TPS
  • 确保consumer端没有因为异常而导致消费hang住
  • 如果你使用的是消费者组,确保没有频繁地发生rebalance
5.21. 如何判定 生产者速度大于消费者
  • 用kafka自带的命令行工具kafka-consumer-groups.sh。可以查看指定消费者组对其消费的所有partition的位移落后情况(也就是业务上的堆积量)。在一段时间内多次使用这个工具查看消费位移落后的情况,如果越来越大,就说明消费慢于生产。
5.22. 在规划消息磁盘的时候会考虑什么
  • 新增消息数
  • 消息留存时间
  • 平均消息大小
  • 备份数
  • 是否启用压缩
5.23. Kafka 无消息丢失的配置
  • 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。
  • 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
  • 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
  • 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
  • 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
  • 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
  • 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
  • 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。
5.24. 如何保证消息的不丢

6. 消费组

  1. 简述消费者与消费组之间的关系
    1. Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些
    2. Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group
    3. Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 消费。

7. 位移

  1. 消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?
    • 在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题__consumer_offsets 中。 当前消费者需要提交的消费位移是offset+1
  2. 主题有4个分区,消费者组有2个实例,发布应用的时候,会先新启动一个服务节点,加入消费组,通过重平衡分配到到至少1个最多2个分区,消费者的偏移量是 0 还是啥
    • 假设C1消费P0,P1, C2消费P2,P3。如果C1从未提交,C1挂掉,C2开始消费P0,P1,发现没有对应提交位移,那么按照C2的auto.offset.reset值决定从那里消费,如果是earliest,从P0,P1的最小位移值(可能不是0)开始消费,如果是latest,从P0, P1的最新位移值(分区高水位值)开始消费。但如果C1之前提交了位移,那么C1挂掉之后C2从C1最新一次提交的位移值开始消费。 所谓的重复消费是指,C1消费了一部分数据,还没来得及提交这部分数据的位移就挂了。C2承接过来之后会重新消费这部分数据。
  3. 为什么位移主题写入消息时,不直接替换掉原来的数据,像 HashMap 一样呢?而是要堆积起来,另起线程来维护位移主题
    • 位移主题也是主题,也要遵循Kafka底层的日志设计思路,即append-only log
  4. 位移主题用来记住位移,那么这个位移主题的位移由谁来记住呢?
    • 位移主题的位移由Kafka内部的Coordinator自行管理
  5. 消费者提交的位移消息,保存到位移主题分区是随机的吗?
    • 不是随机的。通常来说,同一个group下的所有消费者提交的位移数据保存在位移主题的同一个分区下

8. rebalance

  1. 如何缩短rebalance时间
    • 减少consumer个数

当消费者拉取消息或者提交时,便会发送心跳。如果消费者超过一定时间没有发送心跳,那么它的会话(Session)就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡。

另外更高版本的 Kafka 支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费。

  1. 缩短单条消息处理的时间:这个涉及业务流程的优化或者改造,得具体问题具体分析,在 bps 这个场景,暂时没有优化的空间 2. 增加消费者端允许下游系统消费一批消息的最大时长:当消费者组完成重平衡之后,每个消费者实例都会定期地向协调者发送心跳请求,表明它还存活着。如果某个消费者实例不能及时地发送这些心跳请求,协调者就会认为该消费者已经“死”了,从而将其从组中移除,然后开启新一轮重平衡。消费者端有个参数,叫 session.timeout.ms,就是被用来表征此事的。该参数的默认值是 10 秒,即如果协调者在 10 秒之内没有收到组内某个消费者实例的心跳,它就会认为这个消费者实例已经挂了。可以这么说,session.timeout.ms 决定了消费者存活性的时间间隔 3. 控制发送心跳请求频率:消费者还提供了一个允许你控制发送心跳请求频率的参数,就是 heartbeat.interval.ms。这个值设置得越小,消费者实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启重平衡 4. 减少下游系统一次性消费的消息总数:这取决于消费者端参数 max.poll.records 的值。当前该参数的默认值是 500 条,表明调用一次 KafkaConsumer.poll 方法,最多返回 500 条消息。可以说,该参数规定了单次 poll 方法能够返回的消息总数的上限。如果前两种方法对你都不适用的话,降低此参数值是避免 CommitFailedException 异常最简单的手段 5. 调整两次调用 poll 方法的最大时间间隔:消费者端还有一个参数,用于控制消费者实际消费能力对重平衡的影响,即 max.poll.interval.ms 参数。它限定了消费者端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的消费者程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么消费者会主动发起“离开组”的请求,协调者也会开启新一轮重平衡 6. 下游系统使用多线程来加速消费:具体的思路就是,让下游系统手动创建多个消费线程处理 poll 方法返回的一批消息。之前你使用 Kafka 消费者消费数据更多是单线程的,所以当消费速度无法匹及 Kafka 消费者消息返回的速度时,它就会抛出 CommitFailedException 异常。如果是多线程,你就可以灵活地控制线程数量,随时调整消费承载能力,再配以目前多核的硬件条件,该方法可谓是防止 CommitFailedException 最高档的解决之道。事实上,很多主流的大数据流处理框架使用的都是这个方法,比如 Apache Flink 在集成 Kafka 时,就是创建了多个 KafkaConsumerThread 线程,自行处理多线程间的数据消费。不过,凡事有利就有弊,这个方法实现起来并不容易,特别是在多个线程间如何处理位移提交这个问题上,更是极容易出错。

  2. 什么情况下会Rebalance

    • 只有consumer成员数量、订阅topic分区数发生增减才会触发

9. Producer

9.1. 谈一谈 Kafka Producer 的 acks 参数的作用。(producer发个broker)
  • acks=0
    • Producer不会等待Broker的确认反馈,不关心Broker是否正确的将发送来的数据持久化,所以在这种模式下,很有可能会丢失数据。因为如果Broker挂了,Producer不会被通知到,所以还会不停的发送数据导致数据丢失。在对数据完整性需求不强烈的场景下,这种模式可以提高性能。
  • acks=1
    • 默认采用的模式,该模式下Producer会等待Leader Broker的确认反馈,当Broker确实将数据持久化到至少一个Partition中后,给予Producer确认反馈,Producer才会继续发送数据。该模式下有几点需要注意:
      • 不保证Replicas也持久化了数据。
      • 当Producer没有收到Broker的确认反馈时,Producer会尝试重新发送数据。
      • 当Leader Broker挂了,但是Replicas又没有持久化数据时,还是会丢失数据。
      • 该模式只能说是可以有效防止数据丢失。
  • acks=all
    • Producer同样需要等待Broker的确认,但是确认更为严格,需要所有的Partition(Leader + Replicas)都持久化数据后才返回确认信息。这种模式下,只要Replicas足够多,数据基本不会丢失。
    • 在该模式下,还有一个重要的参数min.insync.replicas需要配置。该参数的意思是当acks=all时,至少有多少个Replicas需要确认已成功持久化数据,这个Replicas数量也包括Leader。
9.2. 对producer的retry理解
  • 有时候Producer发送Message失败可能并不是因为Broker挂了,可能是因为网络问题,没有连接到Broker等。这种问题可能在很短暂的时间内就会自动修复,那么在这种情况下,我们希望Producer在发送失败后能重新尝试发送。这里就需要设置retries这个参数,意思就是重试的次数,默认是0次,可以根据实际业务情况设置。

但是当设置了retries参数大于0后,有可能会带来新的问题。假如我们需要相同Key的Message进入特定的Partition,并且是要严格按照Producer生产Message的顺序排序。那么此时如果第一条Message发送失败,第二条Message发送成功了,第一条通过重试发送成功了,那Message的顺序就发生了变化。

这里又会引出一个参数max.in.flight.requests.per.connection,这个参数默认是5,意思是在被Broker阻止前,未通过acks确认的发送请求最大数,也就是在Broker处排队等待acks确认的Message数量。所以刚才那个场景,第一条和第二条Message都在Broker那排队等待确认放行,这时第一条失败了,等重试的第一条Message再来排队时,第二条早都通过进去了,所以排序就乱了。

如果想在设置了retries还要严格控制Message顺序,可以把max.in.flight.requests.per.connection设置为1。让Broker处永远只有一条Message在排队,就可以严格控制顺序了。但是这样做会严重影响性能(接收Message的吞吐量)。

Kafka在0.11版本之后,就为我们提供了定义幂等Producer的能力

  • retries=Integer.MAX_VALUE
  • max.in.flight.requests.per.connection=1 (Kafka >= v0.11 & < v1.1)
  • max.in.flight.requests.per.connection=5 (Kafka >= v1.1)
  • acks=all
9.3. 对Message Batch的理解

max.in.flight.requests.per.connection参数,默认会在Broker那排队5条Message,那么如果第六条来了怎么办呢?这时候Kafka会自动开启批量处理Message的模式,将这6条Message作为一个批次进行处理。这一个批次可以看作是一次Message处理请求。

开启批量模式后,会引出两个参数:

  • linger.ms:每次批量处理的间隔时间。如果设为5,那么就是每5毫秒对Message进行一次批量处理。
  • batch.size:每个批次的最大字节数,默认是16KB,可以设置为32KB或者64KB,可以提高性能。
9.4. 对 Producer Buffer的理解

在大多数情况下,Consumer消费Message的速率是远不如Producer生产Message的速率的。所以Producer有一个缓存机制,将Broker还没来得及接收的Message缓存在内存中。缓存的大小可以通过buffer.memory配置,默认大小是32MB。默认存储时间为7天,这个时间可以通过设置Broker的offset.retention.minutes属性改变。

如果Producer的缓存被打满后,Producer会被阻塞,阻塞的最大时间可以通过max.block.ms配置,默认大小是60秒。

概括一下,就是当Producer生产Message的速率大于Broker接收Message(Consumer消费数据)的速率时,Producer会把Broker还没来得及接收的Message存在缓存里(内存),当存满设置的缓存大小后,Producer将不再发送Message给Broker,也就是进入阻塞状态,如果在设置的阻塞时间内,缓存还没有被释放出有用空间,那么Producer将抛出异常。

9.5. 幂等生产者

消息交付可靠性保障 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺。常见承诺有以下三种:

  • 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
  • 至少一次(at least once):消息不会丢失,但有可能被重复发送。(kafka默认)
  • 精确一次(exactly once):消息不会丢失,也不会被重复发送。

指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。

底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了

幂等性 Producer 作用范围

  • 它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。

事务

事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。

设置事务型 Producer 的方法也很简单,满足两个要求即可:

  • 和幂等性 Producer 一样,开启 enable.idempotence = true。
  • 设置 Producer 端参数 transactional. id。最好为其设置一个有意义的名字。

producer端可能发送重复消息,broker端有一套机制来去重(幂等性依赖seq number机制,事务依赖各种marker来标记)

  • Producer 发送消息到 Broker 时,会根据Paritition 机制选择将其存储到哪一个Partition。如果 Partition 机制设置合理,所有消息可以均匀分布到不同的 Partition里,这样就实现了负载均衡。指明 Partition 的情况下,直接将给定的 Value 作为 Partition 的值。没有指明 Partition 但有 Key 的情况下,将 Key 的 Hash 值与分区数取余得到 Partition 值。既没有 Partition 有没有 Key 的情况下,第一次调用时随机生成一个整数(后面每次调用都在这个整数上自增),将这个值与可用的分区数取余,得到 Partition 值,也就是常说的 Round-Robin 轮询算法。

10. 调优

10.1. swap 的调优
  • swap 的调优。网上很多文章都提到设置其为 0,将 swap 完全禁掉以防止 Kafka 进程使用 swap 空间。我个人反倒觉得还是不要设置成 0 比较好,我们可以设置成一个较小的值。为什么?因为一旦设置成 0,当物理内存耗尽时,操作系统会触发 OOM killer 这个组件,它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用 swap 空间时,你至少能够观测到 Broker 性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间。基于这个考虑,建议将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1。

11. 其他

  1. kafka如何支持延迟消息队列
    • 利用定时任务调度利用定时任务来实现延迟消息是最好、最简单的办法。对于一个延迟消息来说,一个延迟到 30 分钟后才可以被消费的消息,也可以认为是 30 分钟后才可以发送。也就是说,你可以设定一个定时任务,这个任务会在 30 分钟后把消息发送到消息服务器上。

每个分区独立一个文件存储,在分区数量较多时会退化成全局磁盘随机I/O,这也是Kafka在多Partition时吞吐量大幅下降的原因~

  • 分区很多,并且都存在读写的场景才会触发。
  • 为什么Kafka分区过多会导致顺序读写变为随机读写
    • 因为数据是写入到硬盘的。 如果同时有很多个文件在同时往硬盘去读写的话。从硬盘的角度来看的话,就是同时在硬盘的不同位置去读写,此时硬盘就得去调度不同位置的读写。即使是SSD和NVME的盘,这种在频繁的在硬盘不同位置的读写就是会降低性能。从硬盘角度来看,就是在不同位置的随机读写。

当 Topic 的消息写入存在倾斜,某些分区消息堆积很多,此时选择哪种分区消费模式可以解决问题?

  • 如果数据可以丢弃,那么可以通过重置消费位点到最新来解决历史堆积,让消费者可以消费新的数据。不过,这个方案有缺点,重置位点之前的数据会丢失,如果消费性能还是跟不上的话,那么后续还是会堆积。
  • 如果数据不能丢弃,不用保证消费顺序,那么可以将消费模式切换到共享消费模式,则有多个消费者同时消费一个分区,可以极大地提升消费速度,还可以通过横向增加消费者,从根本上解决堆积问题。
  • 如果数据不能丢弃,且需要保证消费顺序,那么就只能从发送端入手,分析为何发送端写入倾斜,然后解决写入倾斜的问题。

Kafka 从生产到消费的全过程

Kafka 的生产到消费总共经过生产者、Broker、消费者三个模块。大致的流程如下:

  • 在生产端,客户端会先和 Broker 建立 TCP 连接,然后通过 Kafka 协议访问 Broker 的 Metadata 接口获取到集群的元数据信息。接着生产者会向 Topic 或分区发送数据,如果是发送到 Topic,那么在客户端会有消息分区分配的过程。因为 Kafka 协议具有批量发送语义,所以客户端会先在客户端缓存数据,然后根据一定的策略,通过异步线程将数据发送到 Broker。
  • Broker 接收到数据后,会根据 Kafka 协议解析出请求内容,做好数据校验,然后重整数据结构,将数据按照分区的维度写入到底层不同的文件中。如果分区配置了副本,则消息数据会被同步到不同的 Broker 中进行保存。
  • 在消费端,Kafka 提供了消费分组消费和指定分区消费两种模式。消费端也会先经过寻址拿到完整的元数据信息,然后连接上不同的 Broker。如果是消费分组模式消费,则需要经过重平衡、消费分区分配流程,然后连接上对应的分区的 Leader,接着调用 Broker 的 Fetch 接口进行消费。最后一步也是需要提交消费进度来保存消费信息。

哪些环节会存在性能瓶颈和数据可靠性风险?

影响消息队列性能和可靠性的因素很多

  • 网络带宽与延迟:消息队列本质上还是一个I/O密集型系统,内部没有太多复杂的计算逻辑,因此网络无论对Producer、Broker还是Consumer来说都比较重要,网络一抖动,全链路的吞吐量可能就会受影响。
  • Producer的发送模式:选择Oneway/Sync/Async不同的发送模式,会直接影响Producer的性能和可靠性。
  • Broker的物理硬件:特别是磁盘和内存,会直接关系到Broker的存储和消费性能。
  • Consumer的Rebalance:在Rebalance期间,整个消费会暂停,因此如何最大程度降低Rebalance的影响,对Consumer端来说比较重要。

Kafka 集群中修改配置 / 权限操作的流程?

Kafka 修改配置 / 权限的实现,是每个 Broker 直接去监听 Broker 中的节点。Broker 会直接监听 ZooKeeper 上的节点,然后根据 Hook 到的信息,做对应的操作。比如修改集群和 Topic 配置,就是 Broker 通过直接监听 ZooKeeper 的不同子节点来实现的。这种方式的好处是,Broker 直接监听 ZooKeeper,避免 Controller 转发一道,从而避免让 Controller 成为瓶颈,整体链路更短,出问题的概率也更低。

  1. 同一个Group中的不同Consumer实例可以订阅不同的Topic吗

可以的。虽然在实际使用中可能更多的还是同一个group的多个实例订阅相同的topic。

可能无法消费到这个 consumer 没有订阅的主题, 导致某个 consumer 挂掉之后, 有些消息无法消费

Kafka Broker 是如何持久化数据的。总的来说,Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,故避免了缓慢的随机 I/O 操作,改为性能较好的顺序 I/O 写操作,这也是实现 Kafka 高吞吐量特性的一个重要手段。不过如果你不停地向一个日志写入消息,最终也会耗尽所有的磁盘空间,因此 Kafka 必然要定期地删除消息以回收磁盘。怎么删除呢?简单来说就是通过日志段(Log Segment)机制。在 Kafka 底层,一个日志又近一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka 在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的。

消费组相关

开始分区1被消费者A消费,rebalance 后被消费者B消费,那么消费者B是对分区从头开始消费还是继承消费者A的位移继续消费?

  • 如果A提交了位移,那么rebalance过后B从A提交的位移处继续消费。如果A没有提交过位移,那么视consumer端参数auto.offset.reset值而定

每次重启一个服务,都会产生下线一次rebalance,上线一次rebalance?

  • 社区于2.4引入了静态成员变量,可以规避这个问题

rebalance时,全部实例都要参与重新分配。是否能参考 一致性哈希算法,尽量减少对全局的影响

———————————-分割线

  1. 解释Kafka的Zookeeper是什么?我们可以在没有Zookeeper的情况下使用Kafka吗?
    • Zookeeper是一个开放源码的、高性能的协调服务,它用于Kafka的分布式应用。不,不可能越过Zookeeper,直接联系Kafka broker。一旦Zookeeper停止工作,它就不能服务客户端请求。·Zookeeper主要用于在集群中不同节点之间进行通信
  2. Consumer的水平扩展是如何实现的呢?
    • Kafka支持Consumer的水平扩展能力。可以让多个Consumer加入一个Consumer Group(消费组),在一个Consumer Group中,每个分区只能分配给一个Consumer消费者,当Kafka服务端通过增加分区数量进行水平扩展后, 可以向Consumer Group中增加新的Consumer来提高整个Consumer Group的消费能力。当Consumer Group中的一个Consumer出现故障下线时,会通过Rebalance操作将下线Consumer,它负责处理的分区将分配给其他Consumer继续处理。当下线Consumer重新上线加人Consumer Group时,会再进行一次Rebalance操作,重新分配分区。
  3. 为了避免磁盘被占满,Katka会周期性地删除陈旧的消息,删除策略是什么呢?
    • Kafka中有两种“保留策略”:一种是根据消息保留的时间,当消息在Kafka中保存的时间超过了指定时间,就可以被删除; 另一种是根据Topic存储的数据大小,当Topic所占的日志文件大小大于一个阈值,则可以开始删除最旧的消息。Kafka会启动一个后台线程,定期检查是否存在可以删除的消息。“保留策略”的配置是非常灵活的,可以有全局的配置,也可以针对Topic进行配置覆盖全局配置。
  4. 什么是broker?它的作用是什么?
    • 一个单独的Kafka Server就是一个Broker。Broker的主要工作就是接收生产者发过来的消息,分配offset,之后保存到磁盘中。同时,接收消费者、其他Broker的请求,根据请求类型进行相应处理并返回响应。在一般的生产环境中,一个Broker独占一台物理服务器
  5. 同一分区的多个副本包括的消息是否是一致的?
    • 每个副本中包含的消息是一样的,但是在同一时刻,副本之间其实并不是完全一样的。
  6. Consumer Group中消费者的数量是不是越多越好呢?
    • Consumer Group中消费者的数量并不是越多越好,当其中消费者数量超过分区的数量时,会导致有消费者分配不到分区,从而造成消费者的浪费。
  7. 详述一下消息在kafka中的生命周期?
    • 生产者会根据业务逻辑产生消息,之后根据路由规则将消息发送到指定分区的Leader副本所在的Broker上。在Kafka服务端接收到消息后,会将消息追加到Log中保存,之后Follower副本会与Leader副本进行同步,当ISR集合中所有副本都完成了此消息的同步后,则Leader副本的HW会增加,并向生产者返回响应。 消费者加人到Consumer Group时,会触发Rebalance操作将分区分配给不同的消费者消费。随后,消费者会恢复其消费位置,并向Kafka服务端发送拉取消息的请求,Leader副本会验证请求的offset以及其他相关信息,最后返回消息。

某个分区的leader挂了,在切换选举到另外副本为leader时,这个副本还没同步之前的leader数据,这样数据就丢了 ?

对于producer而言,如果在乎数据持久性,那么应该设置acks=all,这样当出现你说的这个情况时,producer会被显式通知消息发送失败,从而可以重试。

Kafka中的ISR、AR又代表什么?ISR的伸缩又指什么

Kafka中的HW、LEO、LSO、LW等分别代表什么?

Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?

KafkaConsumer是非线程安全的,那么怎么样实现多线程消费?

如果我指定了一个offset,Kafka怎么查找到对应的消息?

聊一聊你对Kafka的Log Retention的理解

聊一聊你对Kafka的Log Compaction的理解

聊一聊你对Kafka底层存储的理解(页缓存、内核层、块层、设备层)

聊一聊Kafka的延时操作的原理

聊一聊Kafka控制器的作用

消费再均衡的原理是什么?(提示:消费者协调器和消费组协调器)

Kafka中的事务是怎么实现的(这题我去面试6加被问4次,照着答案念也要念十几分钟,面试官简直凑不要脸。实在记不住的话…只要简历上不写精通Kafka一般不会问到,我简历上写的是“熟悉Kafka,了解RabbitMQ….”)

Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?

怎么计算Lag?(注意read_uncommitted和read_committed状态下的不同)

Kafka的那些设计让它有如此高的性能?

在使用Kafka的过程中遇到过什么困难?怎么解决的?

怎么样才能确保Kafka极大程度上的可靠性?

kafka认为写入成功是指写入页缓存成功还是数据刷到磁盘成功算成功呢?还是上次刷盘宕机失败的问题,页缓存的数据如果刷盘失败,是不是就丢了?这个异常会不会响应给生产者让其重发呢?

写入到页缓存即认为成功。如果在flush之前机器就宕机了,的确这条数据在broker上就算丢失了。producer端表现如何取决于acks的设定。如果是acks=1而恰恰是leader broker在flush前宕机,那么的确有可能消息就丢失了,而且producer端不会重发——因为它认为是成功了。

Producer 通过 metadata.max.age.ms定期更新元数据,在连接多个broker的情况下,producer是如何决定向哪个broker发起该请求?

向它认为当前负载最少的节点发送请求,所谓负载最少就是指未完成请求数最少的broker

开始分区1被消费者A消费,rebalance 后被消费者B消费,那么消费者B是对分区从头开始消费还是继承消费者A的位移继续消费?

如果A提交了位移,那么rebalance过后B从A提交的位移处继续消费。如果A没有提交过位移,那么视consumer端参数auto.offset.reset值而定

  1. 为了提高效率,Kafka以批量的方式写入。一个batch就是一组消息的集合,这一组的数据都会进入同一个topic 和 partition(这个是根据 producer的配置来定的)。每一个消息都进行一次网络传输会很消耗性能,因此把消息收集到一起,再同时处理就高效的多了。当然,这样会引入更高的延迟以及吞吐量: batch 越大,同一时间处理的消息就越多。batch 通常都会进行压缩,这样在传输以及存储的时候效率都更高一些。
  2. 消息都是以主题 Topic 的方式组织在一起,Topic 也可以理解成传统数据库里的表,或者文件系统里的一个目录。一个主题由 broker 上的一个或者多个 Partition 分区组成。在 Kafka 中数据是以 Log 的方式存储,一个 partition 就是一个单独的 Log。消息通过追加的方式写入日志文件,读取的时候则是从头开始 按照顺序读取。注意,一个主题通常都是由多个分区组成的,每个分区内部保证消息的顺序行,分区之间是不保证顺序的。如果你想要 kafka 中的数据按照时 间的先后顺序进行存储,那么可以设置分区数为 1。如下图所示,一个主题由 4 个分区组成,数据都以追加的方式写入这四个文件。分区的方式为 Kafka 提供 了良好的扩展性,每个分区都可以放在独立的服务器上,这样就相当于主题可以在多个机器间水平扩展,相对于单独的服务器,性能更好。
  3. Kafka 中主要有两种使用者:Producer 和 consumer
    • Producer 用来创建消息。在发布订阅系统中,他们也被叫做 Publisher 发布者或 writer 写作者。通常情况下,消息都会进入特定的主题。默认情况下,生产者 不关系消息到底进入哪个分区,它会自动在多个分区间负载均衡。也有的时候,消息会进入特定的一个分区中。一般都是通过消息的 key 使用哈希的方式确定 它进入哪一个分区。这就意味着如果所有的消息都给定相同的 key,那么他们最终会进入同一个分区。生产者也可以使用自定义的分区器,这样消息可以进入 特定的分区。
  4. Consumer 读取消息。在发布订阅系统中,也叫做 subscriber 订阅者或者 reader 阅读者。消费者订阅一个或者多个主题,然后按照顺序读取主题中的数据。消 费者需要记录已经读取到消息的位置,这个位置也被叫做 offset。每个消息在给定的分区中只有唯一固定的 offset。通过存储最后消费的 Offset,消费者应用 在重启或者停止之后,还可以继续从之前的位置读取。保存的机制可以是 zookeeper,或者 kafka 自己。
  5. 消费者是以 consumer group 消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个 topic。每个分区在同一时间只能由 group 中的一个消费 者读取,在下图中,有一个由三个消费者组成的 grouop,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也 可以叫做某个消费者是某个分区的拥有者。
  6. 在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的 group 成员会自动负载均衡读取之前失败的 消费者读取的分区。
  7. 单独的 kafka 服务器也叫做 broker,Broker 从生产者那里获取消息,分配 offset,然后提交存储到磁盘年。他也会提供消费者,让消费者读取分区上的消息, 并把存储的消息传给消费者。依赖于一些精简资源,单独的 broker 也可以轻松的支持每秒数千个分区和百万级的消息。
  8. Kafka 的 broker 支持集群模式,在 Broker 组成的集群中,有一个节点也被叫做控制器(是在活跃的节点中自动选择的)。这个 controller 控制器负责管理整个 集群的操作,包括分区的分配、失败节点的检测等。一个 partition 只能出现在一个 broker 节点上,并且这个 Broker 也被叫做分区的 leader。一个分区可以分 配多个 Broker,这样可以做到多个机器之间备份的效果。这种多机备份在其中一个 broker 失败的时候,可以自动选举出其他的 broker 提供服务。然而, producer 和 consumer 都必须连接 leader 才能正常工作。
  9. Kafka 的一个重要特性就是支持数据的过期删除,数据可以在 Broker 上保留一段时间。Kafka 的 broker 支持针对 topic 设置保存的机制,可以按照大小配置也 可以按照时间配置。一旦达到其中的一个限制,可能是时间过期也可能是大小超过配置的数值,那么这部分的数据都会被清除掉。每个 topic 都可以配置它自 己的过期配置,因此消息可以按照业务的需要进行持久化保留。比如,一个数据追踪分析的 topic 可以保留几天时间,一些应用的指标信息则只需要保存几个 小时。topic 支持日志数据的压缩,这样 kafka 仅仅会保留最后一条日志生成的 key。这在修改日志类型的时候会非常有用。

假设有个 Kafka 集群由 2 台 Broker 组成,有个主题有 5 个分区,当一个消费该主题的消费者程序启动时,你认为该程序会创建多少个 Socket 连接?为什么?

  • 整个生命周期里会建立4个连接,进入稳定的消费过程后,同时保持3个连接,以下是详细。 第一类连接:确定协调者和获取集群元数据。 一个,初期的时候建立,当第三类连接建立起来之后,这个连接会被关闭。 第二类连接:连接协调者,令其执行组成员管理操作。 一个 第三类连接:执行实际的消息获取。 两个分别会跟两台broker机器建立一个连接,总共两个TCP连接,同一个broker机器的不同分区可以复用一个socket。只有2个 Broker,5个分区的领导者副本,由zookeeper分配Leader,所以默认是均匀的,故有4个TCP连接。

你是如何解决有序消息这个问题的?用的是哪种方案?

  • 只需要确保同一个业务的消息发送到同一个分区就可以保证同一个业务的消息是有序的。

怎么保证同一个业务的消息必然发送到同一个分区呢?

  • 只需要生产者在发送消息的时候,根据业务特征,比如说业务 ID 计算出目标分区,在发送的时候显式地指定分区就可以了。

如果你用的是单分区解决方案,那么有没有消息积压问题?如果有,你是怎么解决的?

如果你用的是多分区解决方案,那么有没有分区负载不均衡的问题?如果有,你是怎么解决的?

增加分区会引起消息失序

  • 它还有另外一个缺点,就是如果中间有增加新的分区,那么就可能引起消息失序。比如说最开始 id 为 3 的订单消息 msg1 发到分区 0 上,但是这时候很不幸分区 0 上积攒了很多消息,所以 msg1 迟迟得不到消费。
  • 紧接着我们扩容,增加了一个新的分区。如果这时候来了一个消息 msg2,那么它会被转发到分区 3 上。分区 3 上面没有积攒什么数据,所以消费者 3 直接就消费了这个消息。
  • 这时候我们发现,本来 msg1 应该先于 msg2 被消费。而增加分区之后 msg2 反而被先消费了。这就是一个典型的消息失序场景。

针对这个缺点我们也可以进一步提出解决方案。这个消息失序的场景解决起来倒也很简单,就是新增加了分区之后,这些新分区的消费者先等一段时间,比如说三分钟,确保同一个业务在其他分区上的消息已经被消费了。

要解决这个问题也很容易。对于新加入的分区,可以暂停消费一段时间。比如说在前面的例子中,如果我们估算 msg1 会在一分钟内被消费,那么新加入的分区的消费者可以在三分钟后再开始消费。那么大概率 msg1 就会先于 msg2 消费。不过这种等待的解决方式并不能解决根本问题,只能说是很大程度上缓解了问题。但是本身增加分区也是一个很不常见的操作,再叠加消息失序的概率也很低,所以我们也可以通过监控发现这种失序场景,然后再手工修复一下就可以了。