当前位置:网站首页>第7章 __consumer_offsets topic
第7章 __consumer_offsets topic
2022-07-06 09:29:00 【留不住斜阳】
7.1 基本结构
key
- version:版本字段,不同kafka版本version不同
- group: 对应消费者组的groupid,这条消息要发送到__consumer_offsets的哪个分区,是由这个字段决定的
- topic:主题名称
- partition:主题的分区
value
- version:版本字段,不同kafka版本的version不同
- offset:这个group消费这个topic到哪个位置了
- metadata:自定义元数据信息
- commit_timestamp:提交消费位移的时间
- expire_timestamp:过期时间,当数据过期时会有一个定时任务去清理过期的消息
- 查询__consumer_offsets topic所有内容
注意:运行下面命令前先要在consumer.properties中设置exclude.internal.topics=false
0.11.0.0之前版本
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
0.11.0.0之后版本(含)
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
- 获取指定consumer group位移信息
消息存储分区
Math.abs("test".hashCode()) % 50;//48, 表示test这个groupid的offset记录提交到了__consumer_offset的48号分区里
- 查看__consumer_offsets的数据结构
//kafka 0.11以后(含)
kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 指定分区 --broker-list localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
//kafka0.11以前
kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 指定分区 --broker-list localhost:9092 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
可以看到__consumer_offsets topic的每一日志项的格式都是
[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
7.2 kafka 消费者offset存储方式
在kafka 0.9 版本开始提供了新的consumer及consumer group,位移的管理与保存机制发生了很大的变化,新版本的consumer默认将不再保存位移到zookeeper中,而是__consumer_offsets topic中
- 如果消费者根据java api来消费,也就是
kafka.javaapi.consumer.ConsumerConnector
,通过配置参数zookeeper.connect
来消费。这种情况,消费者的offset会更新到zookeeper的/kafka/consumers/<group.id>/offsets/<topic>/<partitionId>
,但是zookeeper其实并不适合进行大批量的读写操作,尤其是写操作。 - 因此kafka提供了另一种解决方案:增加__consumer_offsets topic,将offset信息写入这个topic,摆脱对zookeeper的依赖(指保存offset这件事情)。__consumer_offsets中的消息保存了每个consumer group某一时刻提交的offset信息。
7.3 创建消费者获取__consumer_offsets topic内容
import kafka.common.OffsetAndMetadata;
import kafka.coordinator.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Properties;
public class KafkaCli {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "cdh003:9092");
//每个消费者分配独立的组号
props.put("group.id", "test_1");
//如果value合法,则自动提交偏移量
props.put("enable.auto.commit", "true");
//设置多久一次更新被消费消息的偏移量
props.put("auto.commit.interval.ms", "1000");
//设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
props.put("session.timeout.ms", "30000");
//该参数表示从头开始消费该主题
props.put("auto.offset.reset", "earliest");
//注意反序列化方式为ByteArrayDeserializer
props.put("key.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("__consumer_offsets"));
System.out.println("Subscribed to topic " + "__consumer_offsets");
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
for (ConsumerRecord<byte[], byte[]> record : records) {
// 这里直接将record的全部信息写到System.out打印流中
// GroupMetadataManager.OffsetsMessageFormatter formatter = new GroupMetadataManager.OffsetsMessageFormatter();
// formatter.writeTo(record, System.out);
//对record的key进行解析,注意这里的key有两种OffsetKey和GroupMetaDataKey
//GroupMetaDataKey中只有消费者组ID信息,OffsetKey中还包含了消费的topic信息
BaseKey key = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key()));
if (key instanceof OffsetKey) {
GroupTopicPartition partition = (GroupTopicPartition) key.key();
String topic = partition.topicPartition().topic();
String group = partition.group();
System.out.println("group : " + group + " topic : " + topic);
System.out.println(key.toString());
} else if (key instanceof GroupMetadataKey) {
System.out.println("groupMetadataKey:------------ "+key.key());
}
//对record的value进行解析
OffsetAndMetadata om = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(record.value()));
//System.out.println(om.toString());
}
}
}
}
Key的解析输出信息如下
Value的解析输出信息如下
使用这种解析方式过一会儿就出现如下异常:
Exception in thread "main" org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'metadata': Error reading string of length 25970, only 12 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:72)
at kafka.coordinator.GroupMetadataManager$.readOffsetMessageValue(GroupMetadataManager.scala:958)
at kafka.coordinator.GroupMetadataManager.readOffsetMessageValue(GroupMetadataManager.scala)
at com.wangjian.KafkaCli.main(KafkaCli.java:68)
当key类型为GroupMetadataKey时,去解析OffsetAndMetadata会抛异常,应该用如下方法去解析GroupMetadata
if (key instanceof GroupMetadataKey) {
System.out.println("groupMetadataKey: " + key.key());
//第一个参数为group id,先将key转换为GroupMetadataKey类型,再调用它的key()方法就可以获得group id
GroupMetadata groupMetadata = GroupMetadataManager.readGroupMessageValue(((GroupMetadataKey) key).key(), ByteBuffer.wrap(record.value()));
System.out.println("GroupMetadata: "+groupMetadata.toString());
}
输出如下
groupMetadataKey: test_1
GroupMetadata: [test_1,Some(consumer),Stable,Map(consumer-1-2b952983-41bd-4bdb-bc65-89ceedd91d26 -> [consumer-1-2b952983-41bd-4bdb-bc65-89ceedd91d26,consumer-1,/172.18.89.153,30000])]
可以看到GroupMetadata中保存了该消费者组中每个消费者的具体信息,包括了消费者所在IP等
使用formatter方式写到System.out输出信息如下
GroupMetadataManager.OffsetsMessageFormatter formatter = new GroupMetadataManager.OffsetsMessageFormatter();
formatter.writeTo(record, System.out);
这是完整的OffsetMessage信息,同时也可以看到__consumer_offsets topic的每一日志项的格式都是
[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
7.4 总结
kafka存储到__consumer_offsets 的topic的元数据信息有两种
- key:OffsetKey —————————> value:OffsetAndMetadata 保存了消费者组各个partition的offset位移信息元数据
- key:GroupMetadataKey ————————> value:GroupMetadata 保存了消费者组中各个消费者的信息
在通过java 开发consumer去消费该topic获取元数据时,应注意区分这两种情况,还有反序列化方式。
对于spark streaming来说,其中DStream中,DStream的本质是RDD序列,读取kafka时,也就是KafkaRDD,通过读取KafkaRDD的getPartitions方法,可以发现,KafkaRDD 的 partition 数据与 Kafka topic 的某个 partition 的 o.fromOffset 至 o.untilOffset 数据是相对应的,也就是说 KafkaRDD 的 partition 与 Kafka partition 是一一对应的。
边栏推荐
- 使用jq实现全选 反选 和全不选-冯浩的博客
- 原生js实现全选和反选的功能 --冯浩的博客
- Some problems encountered in installing pytorch in windows11 CONDA
- js封装数组反转的方法--冯浩的博客
- 1005. Maximized array sum after K negations
- Research Report on market supply and demand and strategy of double drum magnetic separator industry in China
- (POJ - 1458) common subsequence (longest common subsequence)
- 875. 爱吃香蕉的珂珂 - 力扣(LeetCode)
- Codeforces Round #799 (Div. 4)A~H
- Codeforces round 797 (Div. 3) no f
猜你喜欢
Li Kou - 298th weekly match
(POJ - 3685) matrix (two sets and two parts)
读取和保存zarr文件
原生js实现全选和反选的功能 --冯浩的博客
力扣——第298场周赛
QT implementation window gradually disappears qpropertyanimation+ progress bar
分享一个在树莓派运行dash应用的实例。
Openwrt source code generation image
QT有关QCobobox控件的样式设置(圆角、下拉框,向上展开、可编辑、内部布局等)
MariaDB的安装与配置
随机推荐
Codeforces Round #803 (Div. 2)A~C
2027. Minimum number of operations to convert strings
QT realizes window topping, topping state switching, and multi window topping priority relationship
QT模拟鼠标事件,实现点击双击移动拖拽等
Spark的RDD(弹性分布式数据集)返回大结果集
Codeforces round 797 (Div. 3) no f
Calculate the time difference
力扣:第81场双周赛
Codeforces - 1526C1&&C2 - Potions
浏览器打印边距,默认/无边距,占满1页A4
Hbuilder X格式化快捷键设置
Summary of FTP function implemented by qnetworkaccessmanager
2078. Two houses with different colors and the farthest distance
807. Maintain the urban skyline
Market trend report, technological innovation and market forecast of desktop electric tools in China
Suffix expression (greed + thinking)
Log statistics (double pointer)
Candy delivery (Mathematics)
Discussion on QWidget code setting style sheet
Flask框架配置loguru日志库