Charlie Feng's Tech Space

You will survive with skills

Redis Fundamentals

Key Concepts

  • In-memory database
  • Data structures: string, hash, set, sorted set, list, geo, hyperloglog
  • Cluster modes: master-slave, sentinel, cluster sharding
  • Performance optimization through caching

Why Use Redis?

Performance: When encountering SQL queries that take a long time to execute and have results that don’t change frequently, it’s ideal to store the results in cache. This allows subsequent requests to read from cache, enabling rapid response times.

Concurrency: Under high concurrency, if all requests directly access the database, connection exceptions will occur. Redis serves as a buffer, allowing requests to access Redis first instead of directly hitting the database. (MySQL supports ~1,500 QPS, while Redis supports 20,000-50,000 QPS)

Redis Use Cases

  • Caching
  • Flash sales/spike traffic handling
  • Distributed locking

Redis Single-Threading Model

Key Concepts

  • Thread tasks: command processing, I/O handling, persistence, data synchronization
  • Version 6.0+: configurable multi-threading support
  • epoll + reactor pattern

High Performance Reasons

Memory operations: All data operations occur in memory

I/O Model on Linux: Uses epoll combined with reactor pattern

  • epoll fundamentals: Manages multiple socket file descriptors

    • Red-black tree structure maintains all monitored file descriptors
    • Doubly linked list maintains the ready list
    • Interrupt mechanism adds ready file descriptors
    • Key APIs: epoll_create, epoll_ctl, epoll_wait
    • Advantages over poll/select: More efficient for large numbers of connections
  • Reactor pattern:

    • Reactor (Dispatcher): Calls epoll to get available file descriptors and dispatches events
    • Acceptor: Handles connection creation events
    • Handler: Processes I/O read/write events

Redis Data Types and Underlying Structures

String

  • Implementation: SDS (Simple Dynamic String)
  • Use cases: User info caching, counters, distributed locks (SETNX)

Hash

  • Implementation: ziplist (small data) + hashtable (large data)
  • Use cases: Storing objects

List

  • Implementation: Doubly linked list or ziplist
  • Use cases: Message queues, latest articles list

Set

  • Implementation: intset (integer set) or hashtable
  • Use cases: Tag systems, mutual friends (SINTER for intersection)

Sorted Set (ZSet)

  • Implementation: ziplist + skiplist (skip list)
  • Use cases: Leaderboards (ZADD/ZRANGE), delayed queues

Redis Data Persistence

Key Concepts

  • AOF (Append Only File)
  • RDB (Redis Database)
  • Hybrid mode

RDB (Redis Database Backup)

  • Mechanism: Periodically generates binary snapshot files of memory data
  • Process: Forks child process to avoid blocking main thread
  • Frequency: Executes BGSAVE every 5+ minutes
  • Drawback: Potential data loss between snapshots

AOF (Append Only File)

  • Mechanism: Records all write operation commands
  • Sync strategies:
    • Every second (appendfsync everysec) - Default, good balance
    • Every modification (appendfsync always) - Safest but slowest
    • No sync (appendfsync no) - Fastest but risky
  • Rewrite mechanism: Compacts AOF file by removing redundant commands

Comparison

  • RDB: Fast recovery, smaller files, but potential data loss during failures
  • AOF: Better data integrity, but larger files and slower recovery

Redis Cluster Deployment Modes

Master-Slave Replication

  • Read operations: Both master and slave nodes can handle reads
  • Write operations: Only master handles writes, then syncs to slaves
  • Benefits: Read scaling, basic fault tolerance

Sentinel Mode

  • Purpose: Automatic failover when master fails

  • Key functions:

    • Monitoring: Continuously checks master/slave health
    • Election: Selects new master when current master fails
    • Notification: Informs clients of topology changes
  • Failure detection:

    • Subjective down: Single sentinel marks master as down
    • Objective down: Majority of sentinels agree master is down
  • Master selection criteria:

    1. Slave priority configuration
    2. Replication progress (most up-to-date)
    3. Smallest slave ID

Cluster Sharding

  • Purpose: Handles large datasets (>25GB) and utilizes multi-core CPUs
  • Hash slots: Uses 16,384 fixed hash slots for data distribution
  • Benefits:
    • Horizontal scaling
    • Automatic failover
    • No single point of failure

Caching Patterns and Consistency

Key Concepts

  • Cache-Aside pattern
  • Read Through, Write Through, Write Back
  • Cache invalidation strategies

Cache-Aside Pattern

  • Read: Check cache first; if miss, query database and populate cache
  • Write: Update database first, then delete cache

Ensuring Cache-Database Consistency

Delayed Double Delete:

  1. Update database
  2. Delete cache immediately
  3. Wait brief period (100-500ms)
  4. Delete cache again
  5. Trade-off: Lower cache hit rate for better consistency

Fallback strategies:

  • Set cache expiration times
  • Use message queues for asynchronous synchronization

Cache Problems: Penetration, Breakdown, Avalanche

Cache Avalanche

  • Problem: Many cache keys expire simultaneously or Redis instance crashes
  • Solutions:
    • Random expiration times
    • Circuit breaker and rate limiting
    • High-availability cluster deployment
    • Service degradation

Cache Penetration

  • Problem: Queries for non-existent data bypass cache and hit database
  • Solutions:
    • Cache null/default values with short TTL
    • Bloom Filter:
      • Data structure: Bit array + multiple hash functions
      • Write: Hash element multiple times, set corresponding bits to 1
      • Query: If all hash positions are 1, element might exist (false positives possible)
    • Input validation at application layer

Cache Breakdown (Hotspot Invalid)

  • Problem: Single popular cache key expires, causing traffic spike to database
  • Solutions:
    • Never expire hot data
    • Use distributed locks to prevent multiple database hits
    • Pre-warming cache before expiration

Distributed Locking with Redis

Key Concepts

  • SETNX: Only one client can successfully set the same key
  • Expiration time: Prevents deadlocks
  • Lock renewal: Extends lock duration for long-running operations

Lock Retry on Failure

  • Wait time determination: Based on 99th percentile business execution time
  • Implementation approaches:
    • Sleep-based retry
    • Event-driven (listen for DEL events)
    • Lua script for atomic timeout retry

Expiration Time Management

  • Why needed: Prevents deadlocks when lock holder crashes
  • Setting strategy:
    • Analyze 99% of business operations completion time
    • Set 2x safety margin (e.g., if 99% complete in 1s, set 2s expiration)
    • For critical operations, consider 10s or 1 minute

Lock Renewal

  • When needed: Business operations exceed expiration time
  • Implementation:
    • Reset expiration before current expiration
    • Daemon thread periodically checks and renews
    • Redisson watchdog mechanism: Automatic renewal

Lock Release

  • Verification: Always verify lock ownership before release
  • Prevention: Avoid releasing locks held by other threads
  • Implementation: Use Lua script for atomic check-and-delete

Advanced Patterns

  • Redlock Algorithm: Distributed consensus for multiple Redis instances
  • SingleFlight Pattern: Prevents cache stampede by allowing only one request for the same resource

Best Practices Summary

  1. Choose appropriate data structures based on use case
  2. Implement proper persistence strategy (RDB + AOF hybrid recommended)
  3. Design for high availability with clustering and replication
  4. Handle cache problems proactively with proper expiration and fallback strategies
  5. Use distributed locks carefully with proper timeout and renewal mechanisms
  6. Monitor and optimize performance regularly

  • Kafka基础知识

  • 关键词

    • 消息队列、流平台
    • 主题、分区、副本
    • 生产者、消费者、broker、Zookeeper
    • 消费者组、offset
    • ISR协议
  • 消息队列的使用场景

    • 异步、解耦、削峰
    • 性能、扩展性、可用性
    • 使用场景
      • 日志处理
      • 消息通信
      • 秒杀
      • 订单超时取消
  • kafka是一个分布式流处理平台,主要用于实时流数据的传输和处理。它可以将大量的消息和事件以分布式、持久化、高可靠性、高吞吐量的方式传输和存储。

  • 消费者组

    • kafka提供的可扩展且具有容错性的消费者机制。
    • 消费者组是一个有多个消费者实例构成的分组。多个实例共同订阅若干个主题,实现共同消费。同一个组下的每个实例都配置有相同的组ID,被分配不同的订阅分区。当某个实例挂掉的时候,其他实例会自动的承担起该实例负责消费的分区。
  • 架构设计与实现

  • 关键词

    • WAL顺序写
    • 元数据管理
  • 协议和网络模块

  • 数据存储

    • 元数据存储
      • 目前,kafka使用zk存储集群元数据,进行成员管理、controller选举,以及其他一些管理类任务。
      • KIP-500提案之后,kafka将使用社区自研的基于raft的共识算法,替代zk,实现controller自选举。
    • 消息数据存储
      • kafka使用WAL(write ahead log)日志来存储消息。
  • 生产者与消费者

    • 分区分配策略
    • 批量写入
    • 消费者组
  • HTTP协议支持和管控操作

  • 集群构建

  • 数据可靠性

  • 可观测性

  • 如何保证消息有序

  • 关键词

    • WAL
    • 分区内有序
    • 数据不均匀
  • 什么场景需要用到有序消息

    • 下单场景,产生创建订单消息和完成支付消息,业务上要求同一个订单的创建订单消息应该优先于完成支付消息。
  • 全局有序使用单分区方案,有消息积压问题

  • 多分区方案,有分区负载均衡问题

    • 生产者根据业务特征选取同一个业务id的消息,写入到同一个分区
    • 热点用户的数据写入同一个分区,导致该分区QPS很高,消费者也不一定来得及消费导致消息积压。
    • 正确计算目标分区,解决数据不均匀问题
      • 类似redis的槽位分配机制
      • 一致性哈希算法
    • 在单分区增加到多分区的时候,消费新分区的消费者在启动的时候,并不是立刻消费,暂停消费一段时间(比如三分钟、三十分钟),等待旧分区积压的消息都消费完成,避免可能的失序问题。
  • 使用线程池消费数据时,如何保证消息有序

    • 根据业务特征选取同一个业务id的消息组成一组给同一个线程处理。
  • 如何解决消息积压

  • 关键词

    • 一个分区最多只能有一个消费者
    • 增加分区、创建新Topic
  • 分区数大于消费者数量时,增加消费者

  • 增加分区

    • 分区数量预估
  • 创建新Topic

  • 优化消费者性能

    • 消费者使用了分布式锁,考虑能否去掉(同一个业务的消息发送到同一个分区后只会有一个消费者消费)。
    • 消费逻辑降级处理,消息积压时使用快路径,比如使用缓存数据。
    • 异步消费,消费线程拉取一批消息后使用线程池执行消费逻辑。通过批量提交,解决消费者宕机的数据丢失问题。处理逻辑保证幂等,解决重复消费问题。通过异步线程重试或者重新写入Topic,解决部分失败问题,注意重新次数。
  • 生产者聚合多条消息,消费者使用批处理。

  • 如何保证消息不丢失

  • 关键词

    • 消息写入语义
    • 消费者commit
  • 消息丢失发送在哪个阶段

    • 生产存储消息
      • 写入语义控制参数:acks
      • ISR:指和主分区保持了主从同步的所有从分区。
      • 消息丢失场景:
        • 批量写入,客户端请求还未发送时,生产者宕机;
        • acks=0,消息发送成功,但是未写入broker时,broker宕机。
        • acks=1,数据写入主分区后,主分区所在broker宕机,数据未同步,发生主分区选举。
        • acks=all,允许unclean选举的情况下,如果ISR中没有任务分区,会选出没有数据的主分区。
        • acks=all,禁用unclean选举,无论主分区还是从分区,数据都只是写入到了机器的PageCache,broker宕机任然可能丢失消息。
      • 刷盘的参数控制
      • 确保发送方一定发了消息
        • 本地消息表+补偿机制
        • 消息回查
    • 消费消息
      • 异步消费,未提交offset消费者宕机,或者提交了消费逻辑未执行。
  • 如何保证不重复消费

  • 关键词

    • 幂等
    • 唯一索引
    • 异步检测
    • 布隆过滤器判断不存在key
    • redis记录key
  • 重复消费的原因

    • 生产者重复发送
    • 消费者处理完未提交宕机,重新启动后重新消费。
  • 设计幂等的消费逻辑

    • 业务表唯一索引
      • 开启本地事务将业务操作和插入数据到唯一索引两个操作提交,事务提交成功后提交消息。
    • 分库分表情况下(业务表、唯一索引表不在一个数据库)
      • 异步检测,定时扫描唯一索引表的状态数据,与业务数据比较。
    • 布隆过滤器 + redis + 唯一索引:请求成功后将该请求的key记录到数据库唯一索引,再记录到redis或布隆过滤器。
      • 布隆过滤器判断key不存在,那么处理请求
      • 查询redis是否存在,存在就直接返回,这个请求是重复请求
      • 数据库唯一索引冲突,则是重复请求
  • kafka的高吞吐实现

  • 关键词

    • 顺序读写(WAL)
    • 零拷贝(sendfile系统调用):0次CPU拷贝、2次DMA拷贝,2次上下文切换
    • Page Cache:避免直接刷盘,同时缓解JVM的GC压力
    • 日志文件与索引文件分段:二分查找、稀疏索引
    • 批量发送
    • 数据压缩
  • kafka性能衰退的原因

    • Topic或者分区太多
      • 每个分区都有一个日志文件,不同分区之间就不是顺序写了。
      • 减少分区的使用
      • 合并Topic
  • 生产消息的批量处理

    • 在 Producer 端,如果要改善吞吐量,通常的标配是增加消息批次的大小以及批次缓存时间,即 batch.size 和 linger.ms。
    • linger.ms固定时间兜底
  • broker的高性能

    • kafka在写数据的时候,一方面基于OS层面的PageCache(os cache)来写数据,所以性能很高。
    • 另一方面,它采用磁盘顺序写的方式,所以即使数据刷入磁盘的时候,性能也是极高的。
    • 零拷贝:从kafka消费数据的时候,数据读取的过程为:磁盘 -> os cache -> application cache -> os socket缓存 -> 网卡。其中数据从操作系统Cache里拷贝到应用进程的缓存里,接着又从应用程序缓存里拷贝会操作系统的socket缓存里,这两次数据拷贝是没有必要的。零拷贝技术,省略了这两次数据拷贝,数据直接从os Cache发送到网卡。
  • kafka的性能优化

  • 关键词

    • 优化生产者、broker、消费者
  • 优化生产者

    • acks
      • 追求性能时acks=0或acks=1
      • 追求消息不丢失时只能acks=all
    • 调大批次
      • batch.size不是越大越好,实际项目中压测确定
      • linger.ms间隔时间
    • 调大buffer.memory缓冲池
      • 由于Topic或分区数太多时,可能导致缓冲池不够用
    • 压缩
      • 开启压缩
      • 选择合适的压缩算法
  • 优化broker

    • swap
      • vm.swappiness参数追求性能调小(默认60,可以调到1-10之间)。
    • 优化网络读写缓冲区
      • Socket 默认读写缓冲区大小
      • Socket 最大读写缓冲区大小
      • TCP 读写缓冲区
    • 磁盘io
      • 使用XFS文件系统,提升读写性能。
      • 禁用atime,文件读取不需要修改访问时间属性,提升性能。
    • 主从同步
      • num.replica.fetchers:从分区拉取数据的线程数量,默认是 1。可以考虑设置成 3。
      • replica.fetch.min.bytes:可以通过调大这个参数来避免小批量同步数据。
      • replica.fetch.max.bytes:这个可以调大,比如说调整到 5m,但是不要小于 message.max.byte,也就是不要小于消息的最大长度。
      • replica.fetch.wait.max.ms:如果主分区没有数据或者数据不够从分区的最大等待时间,可以考虑同步调大这个值和 replica.fetch.max.bytes。
    • JVM
      • Full GC会影响消息的写入性能,也可能触发重新选主,或影响ISR。
      • 调整堆内存大小(8G-16G),或CMS增大老年代。
      • 或者启用G1垃圾回收器,并设置-XX:MaxGCPauseMillis=200,减少停顿时间。
  • 优化消费者

    • 解决消息积压
  • 消息中间件对比

  • 关键词

    • RabbitMQ、RocketMQ、Kafka与Pulsar
    • 数据存储在不同的队列
    • 计算存储分离架构

  • Es基础知识

  • 关键词

    • 基于Lucene的Restful的分布式实时全文搜索引擎,快速存储、搜索、分析海量数据。

    • Index索引:存储数据的地方,类似mysql的表。

    • document文档:类似mysql种的一行数据,但是每个文档可以有不同的字段。

    • field字段:最小单位。

    • shard分片:数据切分为多个shard后可以分布在多台服务器,横向扩展,提升吞吐量和性能。

    • replica副本:一个shard可以创建多个replica副本,提供备用服务,提升搜索吞吐量和可用性。

    • textkeyword:是否分词。

    • queryfilter:是否计算分值。

  • Es的底层原理

  • 关键词

    • 倒排索引

    • 前缀树,也叫做字典树(Trie Tree)

    • refresh操作

    • flush操作

    • fsync操作

  • 什么倒排索引

每个文档都有对应的文档ID,文档内容可以表示为一系列关键词(Term)的集合。通过倒排索引,可以记录每个关键词在文档中出现的次数和位置。
倒排索引是关键词到文档ID、出现频率、位置的映射(posting list),每个关键词都对应一系列的文件,这些文件都出现了该关键词。
每个字段是分散统计的,也就是说每个字段都有一个posting list。关键词的查找基于前缀树,也叫做字典树(trie tree),es里面叫做Term Dictionary。为了节省内存空间,es对前缀树做了优化,压缩了公共前缀、后缀,就是所谓的FST(Finite State Transducers)

  • 写入数据

文档 -> Indexing Buffer -> Page Cache -> 磁盘

Translog -> Page Cache -> 磁盘

  • refresh刷新操作将索引缓存写入到 Page Cache,保存为segment段文件,一个段里面包含了多个文档,refresh默认1秒钟执行一次,此时文档才能够被检索,这也是称Es为近实时搜索的原因。

  • Page Cache通过异步刷新(fsync)将数据写入到磁盘文件。

  • 文档写入缓存的时候,同时会记录Translog,默认5秒钟固定间隔时间刷新到磁盘。

  • 前缀树

  • Es怎么保证高可用?

  • 关键词

    • Es高可用的核心是shard分片与replica副本

    • TransLog保障数据写入的高可用,避免掉电时的写入丢失

  • Es高可用的基本保证

Es高可用的核心是分片,并且每个分片都有主从之分,万一主分片崩溃了,还可以使用从分片,也就是副本分片,从而保证了最基本的可用性。
Es在写入数据的过程中,为了保证高性能,都是写入到自己的Buffer里面,后面再刷新到磁盘上。所以为了降低数据丢失的风险,es还额外写了一个Translog,类似于Mysql里的redo log。后面es崩溃之后,可以利用Translog来恢复数据。

  • Es高可用的额外优化

    • 限流保护节点:插件、网关或代理、客户端限流。

    • 利用消息队列削峰:数据写入数据库,监听binlog,发消息到MQ,消费消息并写入Es。

    • 保护协调节点:使用单一角色;分组与隔离。

    • 双集群部署:消息队列两个消费者双写到AB两个集群。

  • Es查询性能优化?

  • 关键词

    • jvm参数

    • 本地内存优化

  • 高性能方案

    • 常规方案

      • 优化垃圾回收

      • 优化swap

      • 文件描述符

    • 优化分页查询

    • 批量提交

    • 调大refresh时间间隔

    • 优化不必要字段

    • 冷热分离

  • JVM本地内存优化

场景:Elasticsearch 的 Lucene 索引占用大量堆外内存(Off-Heap),配置不当易引发 OOM。

优化方案:

  • 限制字段数据缓存(fielddata)大小,不超过堆内存的 30%

    • indices.fielddata.cache.size: 30%
  • 优化分片(shard)数量

    • 单个分片大小建议在 10-50GB 之间,过多分片会增加堆外内存开销

    • 例如:100GB 索引,分配 2-5 个分片。

  • Es的实战应用

  • 视频图片结构化分析系统使用ElasticSearch存储结构化信息支持目标检索功能

    • 技术选型分析

      • 需要存储哪些数据:视频分析结果数据(人形、车辆、人脸、骑行等)存储。

      • 支持目标检索

    • 部署架构与高可用:三节点部署集群。

    • 性能优化:Es批量写入、Es使用Kafka异步写入、refresh时间间隔配置修改。

    • 常见问题解决经验

      • 数据丢失与备份

      • 分页查询(/image_result/_search?scroll=10m)

      • 脑裂问题

      • 中文分词

        • ik中文分词器

        • 定制分词器:结巴分词+公安特征词库(10万+专用词汇)

消息引擎系统

Kafka Logo

  • Apache Kafka 是一款开源的消息引擎系统。
  • 消息引擎系统是一组规范。企业利用这组规范在不同系统之间传递语义准确的消息,实现松耦合的异步式数据传递。
  • Kafka 是消息引擎系统,也是分布式流处理平台。

Kafka 架构

Kafka Architecture

设计目标:

  • 提供一套 API 实现生产者和消费者;
  • 降低网络传输和磁盘存储开销;
  • 实现高伸缩性架构。

Kafka 版本号

  • 0.7 版本:只有基础消息队列功能,无副本;打死也不使用
  • 0.8 版本:增加了副本机制,新的 producer API;建议使用 0.8.2.2 版本;不建议使用 0.8.2.0 之后的 producer API
  • 0.9 版本:增加权限和认证,新的 consumer API,Kafka Connect 功能;不建议使用 consumer API;
  • 0.10 版本:引入 Kafka Streams 功能,bug 修复;建议版本0.10.2.2;建议使用新版 consumer API
  • 0.11 版本:producer API 幂等,事务 API,消息格式重构;建议版本 0.11.0.3;谨慎对待消息格式变化
  • 1.0 和 2.0 版本:Kafka Streams 改进;建议版本 2.0;

Kafka 的基本使用

如何做 kafka 线上集群部署方案?

https://time.geekbang.org/column/article/101107

集群参数配置

  • Broker 端参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# A comma separated list of directories under which to store log files
log.dirs=/home/kafka1,/home/kafka2,/home/kafka3
# Zookeeper connection string
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/kafka1
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
# The address the socket server listens on
listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise
advertised.listeners=PLAINTEXT://:9092
# Log retention settings
log.retention.hours=168
log.retention.ms=15552000000
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
  • Topic 参数

创建 Topic 时进行设置:

1
2
3
4
5
6
7
8
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--topic transaction \
--partitions 1 \
--replication-factor 1 \
--config retention.ms=15552000000 \
--config max.message.bytes=5242880

修改 Topic 时设置:

1
2
3
4
5
6
bin/kafka-configs.sh \
--zookeeper localhost:2181 \
--entity-type topics \
--entity-name transaction \
--alter \
--add-config max.message.bytes=10485760
  • JVM 端参数
1
2
3
4
5
6
7
8
export KAFKA_HEAP_OPTS="--Xms6g --Xmx6g"
export KAFKA_JVM_PERFORMANCE_OPTS=" \
-server \
-XX:+UseG1GC \
-XX:MaxGCPauseMillis=20 \
-XX:InitiatingHeapOccupancyPercent=35 \
-XX:+ExplicitGCInvokesConcurrent \
-Djava.awt.headless=true"

无消息丢失配置

  1. 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
  2. 设置 acks = all。acks 是 Producer 的一个参数,代表了你对”已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是”已提交”。这是最高等级的”已提交”定义。
  3. 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
  4. 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。
  5. 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
  6. 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是”已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
  7. 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
  8. 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

生产者分区策略

数据可靠性保证

消息幂等与事务

消费者组

Consumer Group

rebalance

  • session.timeout.ms
  • heartbeat.interval.ms

要保证 Consumer 实例在被判定为”dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

  • max.poll.interval.ms

This is a safety mechanism which guarantees that only active members of the group are able to commit offsets. So to stay in the group, you must continue to call poll.

  • GC 参数

The recommended way to handle these cases is to move message processing to another thread, which allows the consumer to continue callingpollwhile the processor is still working. Some care must be taken to ensure that committed offsets do not get ahead of the actual position.

位移提交

https://time.geekbang.org/column/article/106904

Offset Commit

  • 自动提交
1
2
enable.auto.commit=true
auto.commit.interval.ms=5000

一旦设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费
重复消费发生在 consumer 故障重启后,重新从磁盘读取 commited offset。只要 consumer 没有重启,不会发生重复消费,因为在运行过程中 consumer 会记录已获取的消息位移。

  • 手动提交
1
2
3
4
// 同步阻塞
consumer.commitSync()
// 异步回调
consumer.commitAsync(callback)

可同时使用 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性。
比如我程序运行期间有多次异步提交没有成功,比如 101 的 offset 和 201 的 offset 没有提交成功,程序关闭的时候 501 的 offset 提交成功了,就代表前面 500 条还是消费成功了,只要最新的位移提交成功,就代表之前的消息都提交成功了。

消费者组消费进度监控

  • 使用 Kafka 自带的命令行工具 kafka-consumer-groups 脚本。
1
2
3
4
./kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--describe \
--all-groups

Consumer Groups

  • 使用 Kafka Java Consumer API 编程。
  • 使用 Kafka 自带的 JMX 监控指标。

Kafka 的副本机制

Kafka 请求 Reactor 处理机制

broker 参数

1
2
3
4
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

Reactor Pattern

高水位和 Leader Epoch 机制

https://time.geekbang.org/column/article/112118

管理与监控

主题日常管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 创建主题
bin/kafka-topics.sh \
--bootstrap-server broker_host:port \
--create \
--topic my_topic_name \
--partitions 1 \
--replication-factor 1

# 查看主题列表
bin/kafka-topics.sh \
--bootstrap-server broker_host:port \
--list

# 查看主题详情
bin/kafka-topics.sh \
--bootstrap-server broker_host:port \
--describe \
--topic <topic_name>

# 修改分区数
bin/kafka-topics.sh \
--bootstrap-server broker_host:port \
--alter \
--topic <topic_name> \
--partitions <新分区数>

# 修改主题配置
bin/kafka-configs.sh \
--zookeeper zookeeper_host:port \
--entity-type topics \
--entity-name <topic_name> \
--alter \
--add-config max.message.bytes=10485760

# 删除主题
bin/kafka-topics.sh \
--bootstrap-server broker_host:port \
--delete \
--topic <topic_name>

动态参数配置

kafka 调优

https://time.geekbang.org/column/article/128184

OS tuning

  • Virtual Memory
1
2
3
4
5
6
7
8
9
10
# 查看当前配置
cat /proc/sys/vm/swappiness
cat /proc/sys/vm/dirty_background_ratio

# 修改配置
vi /etc/sysctl.conf
# The percentage of how likely the VM subsystem is to use swap space rather than dropping pages from the page cache.
vm.swappiness=1
# The percentage of the total amount of system memory, and setting this value to 5 is appropriate in many situations.
vm.dirty_background_ratio=5
1
2
3
4
5
6
7
# 查看当前状态
cat /proc/vmstat | egrep "dirty|writeback"
nr_dirty 11
nr_writeback 0
nr_writeback_temp 0
nr_dirty_threshold 67635
nr_dirty_background_threshold 33776
  • Disk
1
mount -o noatime
0%