Kafka Notes for Interview

  • Kafka基础知识

  • 关键词

    • 消息队列、流平台
    • 主题、分区、副本
    • 生产者、消费者、broker、Zookeeper
    • 消费者组、offset
    • ISR协议
  • 消息队列的使用场景

    • 异步、解耦、削峰
    • 性能、扩展性、可用性
    • 使用场景
      • 日志处理
      • 消息通信
      • 秒杀
      • 订单超时取消
  • kafka是一个分布式流处理平台,主要用于实时流数据的传输和处理。它可以将大量的消息和事件以分布式、持久化、高可靠性、高吞吐量的方式传输和存储。

  • 消费者组

    • kafka提供的可扩展且具有容错性的消费者机制。
    • 消费者组是一个有多个消费者实例构成的分组。多个实例共同订阅若干个主题,实现共同消费。同一个组下的每个实例都配置有相同的组ID,被分配不同的订阅分区。当某个实例挂掉的时候,其他实例会自动的承担起该实例负责消费的分区。
  • 架构设计与实现

  • 关键词

    • WAL顺序写
    • 元数据管理
  • 协议和网络模块

  • 数据存储

    • 元数据存储
      • 目前,kafka使用zk存储集群元数据,进行成员管理、controller选举,以及其他一些管理类任务。
      • KIP-500提案之后,kafka将使用社区自研的基于raft的共识算法,替代zk,实现controller自选举。
    • 消息数据存储
      • kafka使用WAL(write ahead log)日志来存储消息。
  • 生产者与消费者

    • 分区分配策略
    • 批量写入
    • 消费者组
  • HTTP协议支持和管控操作

  • 集群构建

  • 数据可靠性

  • 可观测性

  • 如何保证消息有序

  • 关键词

    • WAL
    • 分区内有序
    • 数据不均匀
  • 什么场景需要用到有序消息

    • 下单场景,产生创建订单消息和完成支付消息,业务上要求同一个订单的创建订单消息应该优先于完成支付消息。
  • 全局有序使用单分区方案,有消息积压问题

  • 多分区方案,有分区负载均衡问题

    • 生产者根据业务特征选取同一个业务id的消息,写入到同一个分区
    • 热点用户的数据写入同一个分区,导致该分区QPS很高,消费者也不一定来得及消费导致消息积压。
    • 正确计算目标分区,解决数据不均匀问题
      • 类似redis的槽位分配机制
      • 一致性哈希算法
    • 在单分区增加到多分区的时候,消费新分区的消费者在启动的时候,并不是立刻消费,暂停消费一段时间(比如三分钟、三十分钟),等待旧分区积压的消息都消费完成,避免可能的失序问题。
  • 使用线程池消费数据时,如何保证消息有序

    • 根据业务特征选取同一个业务id的消息组成一组给同一个线程处理。
  • 如何解决消息积压

  • 关键词

    • 一个分区最多只能有一个消费者
    • 增加分区、创建新Topic
  • 分区数大于消费者数量时,增加消费者

  • 增加分区

    • 分区数量预估
  • 创建新Topic

  • 优化消费者性能

    • 消费者使用了分布式锁,考虑能否去掉(同一个业务的消息发送到同一个分区后只会有一个消费者消费)。
    • 消费逻辑降级处理,消息积压时使用快路径,比如使用缓存数据。
    • 异步消费,消费线程拉取一批消息后使用线程池执行消费逻辑。通过批量提交,解决消费者宕机的数据丢失问题。处理逻辑保证幂等,解决重复消费问题。通过异步线程重试或者重新写入Topic,解决部分失败问题,注意重新次数。
  • 生产者聚合多条消息,消费者使用批处理。

  • 如何保证消息不丢失

  • 关键词

    • 消息写入语义
    • 消费者commit
  • 消息丢失发送在哪个阶段

    • 生产存储消息
      • 写入语义控制参数:acks
      • ISR:指和主分区保持了主从同步的所有从分区。
      • 消息丢失场景:
        • 批量写入,客户端请求还未发送时,生产者宕机;
        • acks=0,消息发送成功,但是未写入broker时,broker宕机。
        • acks=1,数据写入主分区后,主分区所在broker宕机,数据未同步,发生主分区选举。
        • acks=all,允许unclean选举的情况下,如果ISR中没有任务分区,会选出没有数据的主分区。
        • acks=all,禁用unclean选举,无论主分区还是从分区,数据都只是写入到了机器的PageCache,broker宕机任然可能丢失消息。
      • 刷盘的参数控制
      • 确保发送方一定发了消息
        • 本地消息表+补偿机制
        • 消息回查
    • 消费消息
      • 异步消费,未提交offset消费者宕机,或者提交了消费逻辑未执行。
  • 如何保证不重复消费

  • 关键词

    • 幂等
    • 唯一索引
    • 异步检测
    • 布隆过滤器判断不存在key
    • redis记录key
  • 重复消费的原因

    • 生产者重复发送
    • 消费者处理完未提交宕机,重新启动后重新消费。
  • 设计幂等的消费逻辑

    • 业务表唯一索引
      • 开启本地事务将业务操作和插入数据到唯一索引两个操作提交,事务提交成功后提交消息。
    • 分库分表情况下(业务表、唯一索引表不在一个数据库)
      • 异步检测,定时扫描唯一索引表的状态数据,与业务数据比较。
    • 布隆过滤器 + redis + 唯一索引:请求成功后将该请求的key记录到数据库唯一索引,再记录到redis或布隆过滤器。
      • 布隆过滤器判断key不存在,那么处理请求
      • 查询redis是否存在,存在就直接返回,这个请求是重复请求
      • 数据库唯一索引冲突,则是重复请求
  • kafka的高吞吐实现

  • 关键词

    • 顺序读写(WAL)
    • 零拷贝(sendfile系统调用):0次CPU拷贝、2次DMA拷贝,2次上下文切换
    • Page Cache:避免直接刷盘,同时缓解JVM的GC压力
    • 日志文件与索引文件分段:二分查找、稀疏索引
    • 批量发送
    • 数据压缩
  • kafka性能衰退的原因

    • Topic或者分区太多
      • 每个分区都有一个日志文件,不同分区之间就不是顺序写了。
      • 减少分区的使用
      • 合并Topic
  • 生产消息的批量处理

    • 在 Producer 端,如果要改善吞吐量,通常的标配是增加消息批次的大小以及批次缓存时间,即 batch.size 和 linger.ms。
    • linger.ms固定时间兜底
  • broker的高性能

    • kafka在写数据的时候,一方面基于OS层面的PageCache(os cache)来写数据,所以性能很高。
    • 另一方面,它采用磁盘顺序写的方式,所以即使数据刷入磁盘的时候,性能也是极高的。
    • 零拷贝:从kafka消费数据的时候,数据读取的过程为:磁盘 -> os cache -> application cache -> os socket缓存 -> 网卡。其中数据从操作系统Cache里拷贝到应用进程的缓存里,接着又从应用程序缓存里拷贝会操作系统的socket缓存里,这两次数据拷贝是没有必要的。零拷贝技术,省略了这两次数据拷贝,数据直接从os Cache发送到网卡。
  • kafka的性能优化

  • 关键词

    • 优化生产者、broker、消费者
  • 优化生产者

    • acks
      • 追求性能时acks=0或acks=1
      • 追求消息不丢失时只能acks=all
    • 调大批次
      • batch.size不是越大越好,实际项目中压测确定
      • linger.ms间隔时间
    • 调大buffer.memory缓冲池
      • 由于Topic或分区数太多时,可能导致缓冲池不够用
    • 压缩
      • 开启压缩
      • 选择合适的压缩算法
  • 优化broker

    • swap
      • vm.swappiness参数追求性能调小(默认60,可以调到1-10之间)。
    • 优化网络读写缓冲区
      • Socket 默认读写缓冲区大小
      • Socket 最大读写缓冲区大小
      • TCP 读写缓冲区
    • 磁盘io
      • 使用XFS文件系统,提升读写性能。
      • 禁用atime,文件读取不需要修改访问时间属性,提升性能。
    • 主从同步
      • num.replica.fetchers:从分区拉取数据的线程数量,默认是 1。可以考虑设置成 3。
      • replica.fetch.min.bytes:可以通过调大这个参数来避免小批量同步数据。
      • replica.fetch.max.bytes:这个可以调大,比如说调整到 5m,但是不要小于 message.max.byte,也就是不要小于消息的最大长度。
      • replica.fetch.wait.max.ms:如果主分区没有数据或者数据不够从分区的最大等待时间,可以考虑同步调大这个值和 replica.fetch.max.bytes。
    • JVM
      • Full GC会影响消息的写入性能,也可能触发重新选主,或影响ISR。
      • 调整堆内存大小(8G-16G),或CMS增大老年代。
      • 或者启用G1垃圾回收器,并设置-XX:MaxGCPauseMillis=200,减少停顿时间。
  • 优化消费者

    • 解决消息积压
  • 消息中间件对比

  • 关键词

    • RabbitMQ、RocketMQ、Kafka与Pulsar
    • 数据存储在不同的队列
    • 计算存储分离架构