当前位置:网站首页>28 | 主题管理知多少?

28 | 主题管理知多少?

2022-06-10 01:31:00 久违の欢喜


Kafka 核心技术与实战

管理与监控

28 | 主题管理知多少?

主题日常管理

1. 创建主题( --create)

bin/kafka-topics.sh --bootstrap-server broker_host:port 
--create --topic my_topic_name  --partitions 1 --replication-factor 1

create 表明要创建主题,而 partitions 和 replication factor 分别设置了主题的分区数以及每个分区下的副本数。

从 Kafka 2.2 版本开始,社区推荐用 –bootstrap-server 参数替换 --zookeeper 参数,并且显式地将后者标记为“已过期”。

社区推荐使用 --bootstrap-server 而非 --zookeeper 的原因主要有两个:

  • 使用 --zookeeper 会绕过 Kafka 的安全体系。也就是说,即使为 Kafka 集群设置了安全认证,限制了主题的创建,如果使用 --zookeeper 的命令,依然能成功创建任意主题,不受认证体系的约束。
  • 使用 --bootstrap-server 与集群进行交互,越来越成为使用 Kafka 的标准姿势。换句话说,以后会有越来越少的命令和 API 需要与 ZooKeeper 进行连接。这样,只需要一套连接信息,就能与 Kafka 进行全方位的交互,不用像以前一样,必须同时维护 ZooKeeper 和 Broker 的连接信息。

2. 查询主题

查询所有主题的列表

bin/kafka-topics.sh --bootstrap-server broker_host:port --list

查询单个主题的详细数据

bin/kafka-topics.sh --bootstrap-server broker_host:port 
--describe --topic <topic_name>

如果 describe 命令不指定具体的主题名称,那么 Kafka 默认会返回所有“可见”主题的详细数据。

可见”,是指发起这个命令的用户能够看到的 Kafka 主题。如果指定了 --bootstrap-server,那么这条命令就会受到安全认证体系的约束,即对命令发起者进行权限验证,然后返回它能看到的主题。否则,如果指定 --zookeeper 参数,那么默认会返回集群中所有的主题详细数据。

3. 变更主题

I. 修改主题分区

其实就是增加分区,目前 Kafka 不允许减少某个主题的分区数。

bin/kafka-topics.sh --bootstrap-server broker_host:port 
--alter --topic <topic_name> --partitions <新分区数>

II. 修改主题级别参数

bin/kafka-configs.sh --zookeeper zookeeper_host:port 
--entity-type topics --entity-name <topic_name> 
--alter --add-config max.message.bytes=10485760

设置常规的主题级别参数,还是使用 --zookeeper。 --bootstrap-server,是用来设置动态参数的。

III. 变更副本数

使用自带的 kafka-reassign-partitions 脚本,增加主题的副本数。

IV. 修改主题限速

主要是指设置 Leader 副本和 Follower 副本使用的带宽。

假设主题各个分区的 Leader 副本和 Follower 副本在处理副本同步时,要求不得占用超过 100MBps 的带宽。

要达到这个目的,先为该主题设置限速,即设置 Broker 端参数 leader.replication.throttled.rate 和 follower.replication.throttled.rate,命令如下:

bin/kafka-configs.sh --zookeeper zookeeper_host:port 
--alter --add-config 
'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' 
--entity-type brokers --entity-name 0

这条命令结尾处的 --entity-name 指的是 Broker ID。倘若该主题的副本分别在 0、1、2、3 多个 Broker 上,那么还要依次为 Broker 1、2、3 执行这条命令。

设置好上述的参数之后,还需要为该主题设置要限速的副本,命令如下:

bin/kafka-configs.sh --zookeeper zookeeper_host:port 
--alter --add-config 
'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' 
--entity-type topics --entity-name test

这条命令结尾处的 --entity-name 指的是主题名称。

V. 主题分区迁移

使用 kafka-reassign-partitions 脚本,对主题各个分区的副本进行“手术”般的调整,比如把某些分区批量迁移到其他 Broker 上。

4. 删除主题

bin/kafka-topics.sh --bootstrap-server broker_host:port 
--delete  --topic <topic_name>

删除操作是异步的,执行完这条命令不代表主题立即就被删除了,它仅仅是被标记成“已删除”状态而已。Kafka 会在后台默默地开启主题删除操作。

特殊主题的管理与运维

在 Kafka 0.11 之前,当 Kafka 自动创建该主题时,它会综合考虑当前运行的 Broker 台数和 Broker 端参数 offsets.topic.replication.factor 值,然后取两者的较小值作为该主题的副本数,但这就违背了用户设置 offsets.topic.replication.factor 的初衷。这正是很多用户感到困扰的地方:集群中有 100 台 Broker,offsets.topic.replication.factor 也设成了 3,为什么 __consumer_offsets 主题只有 1 个副本?其实,这就是因为这个主题是在只有一台 Broker 启动时被创建的。

在 0.11 版本之后,社区修正了这个问题。也就是说,0.11 之后,Kafka 会严格遵守 offsets.topic.replication.factor 值。如果当前运行的 Broker 数量小于 offsets.topic.replication.factor 值,Kafka 会创建主题失败,并显式抛出异常。

如果该主题的副本值已经是 1 了,能否把它增加到 3 呢?

第 1 步是创建一个 json 文件,显式提供 50 个分区对应的副本数。注意,replicas 中的 3 台 Broker 排列顺序不同,目的是将 Leader 副本均匀地分散在 Broker 上。该文件具体格式如下:

{
    "version":1, "partitions":[
  {
    "topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]}, 
  {
    "topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},
  {
    "topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},
  {
    "topic":"__consumer_offsets","partition":3,"replicas":[1,2,0]},
  ...
  {
    "topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
]}`

第 2 步是执行 kafka-reassign-partitions 脚本,命令如下:

bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port 
--reassignment-json-file reassign.json --execute

对于 __consumer_offsets 而言,由于它保存了消费者组的位移数据,有时候直接查看该主题消息是很方便的事情。通过下面的命令可以直接查看消费者组提交的位移数据:

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port 
--topic __consumer_offsets --formatter 
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" 
--from-beginning

除了查看位移提交数据,还可以直接读取该主题消息,查看消费者组的状态信息。

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port 
--topic __consumer_offsets --formatter 
"kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" 
--from-beginning

常见主题错误处理

常见错误 1:主题删除失败

实际上,造成主题删除失败的原因有很多,最常见的原因有两个:副本所在的 Broker 宕机了;待删除主题的部分分区依然在执行迁移过程。

如果是因为前者,通常重启对应的 Broker 之后,删除操作就能自动恢复;如果是因为后者,很可能两个操作会相互干扰。

不管什么原因,一旦碰到主题无法删除的问题,可以采用这样的方法:

  • 第 1 步,手动删除 ZooKeeper 节点 /admin/delete_topics 下以待删除主题为名的 znode。
  • 第 2 步,手动删除该主题在磁盘上的分区目录。
  • 第 3 步,在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller
    缓存。

在执行最后一步时,一定要谨慎,因为它可能造成大面积的分区 Leader 重选举。事实上,仅仅执行前两步也是可以的,只是 Controller 缓存中没有清空待删除主题罢了,也不影响使用。

常见错误 2:__consumer_offsets 占用太多的磁盘。

一旦发现这个主题消耗了过多的磁盘空间,那么,一定要显式地用 jstack 命令查看一下 以 kafka-log-cleaner-thread 为前缀的线程状态。通常情况下,这都是因为该线程挂掉了,无法及时清理此内部主题。倘若真是这个原因导致的,那就只能重启相应的 Broker 了。

原网站

版权声明
本文为[久违の欢喜]所创,转载请带上原文链接,感谢
https://blog.csdn.net/TQ20160412/article/details/125202193