Charlie Feng's Tech Space

You will survive with skills

  • Es基础知识

  • 关键词

    • 基于Lucene的Restful的分布式实时全文搜索引擎,快速存储、搜索、分析海量数据。

    • Index索引:存储数据的地方,类似mysql的表。

    • document文档:类似mysql种的一行数据,但是每个文档可以有不同的字段。

    • field字段:最小单位。

    • shard分片:数据切分为多个shard后可以分布在多台服务器,横向扩展,提升吞吐量和性能。

    • replica副本:一个shard可以创建多个replica副本,提供备用服务,提升搜索吞吐量和可用性。

    • textkeyword:是否分词。

    • queryfilter:是否计算分值。

  • Es的底层原理

  • 关键词

    • 倒排索引

    • 前缀树,也叫做字典树(Trie Tree)

    • refresh操作

    • flush操作

    • fsync操作

  • 什么倒排索引

每个文档都有对应的文档ID,文档内容可以表示为一系列关键词(Term)的集合。通过倒排索引,可以记录每个关键词在文档中出现的次数和位置。
倒排索引是关键词到文档ID、出现频率、位置的映射(posting list),每个关键词都对应一系列的文件,这些文件都出现了该关键词。
每个字段是分散统计的,也就是说每个字段都有一个posting list。关键词的查找基于前缀树,也叫做字典树(trie tree),es里面叫做Term Dictionary。为了节省内存空间,es对前缀树做了优化,压缩了公共前缀、后缀,就是所谓的FST(Finite State Transducers)

  • 写入数据

文档 -> Indexing Buffer -> Page Cache -> 磁盘

Translog -> Page Cache -> 磁盘

  • refresh刷新操作将索引缓存写入到 Page Cache,保存为segment段文件,一个段里面包含了多个文档,refresh默认1秒钟执行一次,此时文档才能够被检索,这也是称Es为近实时搜索的原因。

  • Page Cache通过异步刷新(fsync)将数据写入到磁盘文件。

  • 文档写入缓存的时候,同时会记录Translog,默认5秒钟固定间隔时间刷新到磁盘。

  • 前缀树

  • Es怎么保证高可用?

  • 关键词

    • Es高可用的核心是shard分片与replica副本

    • TransLog保障数据写入的高可用,避免掉电时的写入丢失

  • Es高可用的基本保证

Es高可用的核心是分片,并且每个分片都有主从之分,万一主分片崩溃了,还可以使用从分片,也就是副本分片,从而保证了最基本的可用性。
Es在写入数据的过程中,为了保证高性能,都是写入到自己的Buffer里面,后面再刷新到磁盘上。所以为了降低数据丢失的风险,es还额外写了一个Translog,类似于Mysql里的redo log。后面es崩溃之后,可以利用Translog来恢复数据。

  • Es高可用的额外优化

    • 限流保护节点:插件、网关或代理、客户端限流。

    • 利用消息队列削峰:数据写入数据库,监听binlog,发消息到MQ,消费消息并写入Es。

    • 保护协调节点:使用单一角色;分组与隔离。

    • 双集群部署:消息队列两个消费者双写到AB两个集群。

  • Es查询性能优化?

  • 关键词

    • jvm参数

    • 本地内存优化

  • 高性能方案

    • 常规方案

      • 优化垃圾回收

      • 优化swap

      • 文件描述符

    • 优化分页查询

    • 批量提交

    • 调大refresh时间间隔

    • 优化不必要字段

    • 冷热分离

  • JVM本地内存优化

场景:Elasticsearch 的 Lucene 索引占用大量堆外内存(Off-Heap),配置不当易引发 OOM。

优化方案:

  • 限制字段数据缓存(fielddata)大小,不超过堆内存的 30%

    • indices.fielddata.cache.size: 30%
  • 优化分片(shard)数量

    • 单个分片大小建议在 10-50GB 之间,过多分片会增加堆外内存开销

    • 例如:100GB 索引,分配 2-5 个分片。

  • Es的实战应用

  • 视频图片结构化分析系统使用ElasticSearch存储结构化信息支持目标检索功能

    • 技术选型分析

      • 需要存储哪些数据:视频分析结果数据(人形、车辆、人脸、骑行等)存储。

      • 支持目标检索

    • 部署架构与高可用:三节点部署集群。

    • 性能优化:Es批量写入、Es使用Kafka异步写入、refresh时间间隔配置修改。

    • 常见问题解决经验

      • 数据丢失与备份

      • 分页查询(/image_result/_search?scroll=10m)

      • 脑裂问题

      • 中文分词

        • ik中文分词器

        • 定制分词器:结巴分词+公安特征词库(10万+专用词汇)

消息引擎系统

Kafka Logo

  • Apache Kafka 是一款开源的消息引擎系统。
  • 消息引擎系统是一组规范。企业利用这组规范在不同系统之间传递语义准确的消息,实现松耦合的异步式数据传递。
  • Kafka 是消息引擎系统,也是分布式流处理平台。

Kafka 架构

Kafka Architecture

设计目标:

  • 提供一套 API 实现生产者和消费者;
  • 降低网络传输和磁盘存储开销;
  • 实现高伸缩性架构。

Kafka 版本号

  • 0.7 版本:只有基础消息队列功能,无副本;打死也不使用
  • 0.8 版本:增加了副本机制,新的 producer API;建议使用 0.8.2.2 版本;不建议使用 0.8.2.0 之后的 producer API
  • 0.9 版本:增加权限和认证,新的 consumer API,Kafka Connect 功能;不建议使用 consumer API;
  • 0.10 版本:引入 Kafka Streams 功能,bug 修复;建议版本0.10.2.2;建议使用新版 consumer API
  • 0.11 版本:producer API 幂等,事务 API,消息格式重构;建议版本 0.11.0.3;谨慎对待消息格式变化
  • 1.0 和 2.0 版本:Kafka Streams 改进;建议版本 2.0;

Kafka 的基本使用

如何做 kafka 线上集群部署方案?

https://time.geekbang.org/column/article/101107

集群参数配置

  • Broker 端参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# A comma separated list of directories under which to store log files
log.dirs=/home/kafka1,/home/kafka2,/home/kafka3
# Zookeeper connection string
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/kafka1
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
# The address the socket server listens on
listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise
advertised.listeners=PLAINTEXT://:9092
# Log retention settings
log.retention.hours=168
log.retention.ms=15552000000
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
  • Topic 参数

创建 Topic 时进行设置:

1
2
3
4
5
6
7
8
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--topic transaction \
--partitions 1 \
--replication-factor 1 \
--config retention.ms=15552000000 \
--config max.message.bytes=5242880

修改 Topic 时设置:

1
2
3
4
5
6
bin/kafka-configs.sh \
--zookeeper localhost:2181 \
--entity-type topics \
--entity-name transaction \
--alter \
--add-config max.message.bytes=10485760
  • JVM 端参数
1
2
3
4
5
6
7
8
export KAFKA_HEAP_OPTS="--Xms6g --Xmx6g"
export KAFKA_JVM_PERFORMANCE_OPTS=" \
-server \
-XX:+UseG1GC \
-XX:MaxGCPauseMillis=20 \
-XX:InitiatingHeapOccupancyPercent=35 \
-XX:+ExplicitGCInvokesConcurrent \
-Djava.awt.headless=true"

无消息丢失配置

  1. 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
  2. 设置 acks = all。acks 是 Producer 的一个参数,代表了你对”已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是”已提交”。这是最高等级的”已提交”定义。
  3. 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
  4. 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。
  5. 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
  6. 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是”已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
  7. 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
  8. 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

生产者分区策略

数据可靠性保证

消息幂等与事务

消费者组

Consumer Group

rebalance

  • session.timeout.ms
  • heartbeat.interval.ms

要保证 Consumer 实例在被判定为”dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

  • max.poll.interval.ms

This is a safety mechanism which guarantees that only active members of the group are able to commit offsets. So to stay in the group, you must continue to call poll.

  • GC 参数

The recommended way to handle these cases is to move message processing to another thread, which allows the consumer to continue callingpollwhile the processor is still working. Some care must be taken to ensure that committed offsets do not get ahead of the actual position.

位移提交

https://time.geekbang.org/column/article/106904

Offset Commit

  • 自动提交
1
2
enable.auto.commit=true
auto.commit.interval.ms=5000

一旦设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费
重复消费发生在 consumer 故障重启后,重新从磁盘读取 commited offset。只要 consumer 没有重启,不会发生重复消费,因为在运行过程中 consumer 会记录已获取的消息位移。

  • 手动提交
1
2
3
4
// 同步阻塞
consumer.commitSync()
// 异步回调
consumer.commitAsync(callback)

可同时使用 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性。
比如我程序运行期间有多次异步提交没有成功,比如 101 的 offset 和 201 的 offset 没有提交成功,程序关闭的时候 501 的 offset 提交成功了,就代表前面 500 条还是消费成功了,只要最新的位移提交成功,就代表之前的消息都提交成功了。

消费者组消费进度监控

  • 使用 Kafka 自带的命令行工具 kafka-consumer-groups 脚本。
1
2
3
4
./kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--describe \
--all-groups

Consumer Groups

  • 使用 Kafka Java Consumer API 编程。
  • 使用 Kafka 自带的 JMX 监控指标。

Kafka 的副本机制

Kafka 请求 Reactor 处理机制

broker 参数

1
2
3
4
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

Reactor Pattern

高水位和 Leader Epoch 机制

https://time.geekbang.org/column/article/112118

管理与监控

主题日常管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 创建主题
bin/kafka-topics.sh \
--bootstrap-server broker_host:port \
--create \
--topic my_topic_name \
--partitions 1 \
--replication-factor 1

# 查看主题列表
bin/kafka-topics.sh \
--bootstrap-server broker_host:port \
--list

# 查看主题详情
bin/kafka-topics.sh \
--bootstrap-server broker_host:port \
--describe \
--topic <topic_name>

# 修改分区数
bin/kafka-topics.sh \
--bootstrap-server broker_host:port \
--alter \
--topic <topic_name> \
--partitions <新分区数>

# 修改主题配置
bin/kafka-configs.sh \
--zookeeper zookeeper_host:port \
--entity-type topics \
--entity-name <topic_name> \
--alter \
--add-config max.message.bytes=10485760

# 删除主题
bin/kafka-topics.sh \
--bootstrap-server broker_host:port \
--delete \
--topic <topic_name>

动态参数配置

kafka 调优

https://time.geekbang.org/column/article/128184

OS tuning

  • Virtual Memory
1
2
3
4
5
6
7
8
9
10
# 查看当前配置
cat /proc/sys/vm/swappiness
cat /proc/sys/vm/dirty_background_ratio

# 修改配置
vi /etc/sysctl.conf
# The percentage of how likely the VM subsystem is to use swap space rather than dropping pages from the page cache.
vm.swappiness=1
# The percentage of the total amount of system memory, and setting this value to 5 is appropriate in many situations.
vm.dirty_background_ratio=5
1
2
3
4
5
6
7
# 查看当前状态
cat /proc/vmstat | egrep "dirty|writeback"
nr_dirty 11
nr_writeback 0
nr_writeback_temp 0
nr_dirty_threshold 67635
nr_dirty_background_threshold 33776
  • Disk
1
mount -o noatime
0%