当前位置:网站首页>Chapter 7__ consumer_ offsets topic
Chapter 7__ consumer_ offsets topic
2022-07-06 16:34:00 【Can't keep the setting sun】
7.1 The basic structure

key
- version: Version field , Different kafka edition version Different
- group: Corresponding to the consumer group groupid, This message will be sent to __consumer_offsets Which section of , It is determined by this field
- topic: Subject name
- partition: Theme partition
value
- version: Version field , Different kafka Version of version Different
- offset: This group Consume this topic Where is it
- metadata: Customize metadata information
- commit_timestamp: Time to submit consumption displacement
- expire_timestamp: Expiration time , When the data expires, there will be a scheduled task to clean up the expired messages
- Inquire about __consumer_offsets topic All contents
Be careful : Before running the following command, you must first consumer.properties Set in exclude.internal.topics=false
0.11.0.0 The previous version
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
0.11.0.0 After the version ( contain )
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
- Get specified consumer group Displacement information
Message storage partition
Math.abs("test".hashCode()) % 50;//48, Express test This groupid Of offset The record was submitted to __consumer_offset Of 48 In partition No
- see __consumer_offsets Data structure of
//kafka 0.11 in the future ( contain )
kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition The specified partition --broker-list localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
//kafka0.11 before
kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition The specified partition --broker-list localhost:9092 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
You can see __consumer_offsets topic The format of each log entry of is
[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
7.2 kafka consumer offset storage
stay kafka 0.9 The version begins with a new consumer And consumer group, Great changes have taken place in the management and preservation mechanism of displacement , The new version of the consumer By default, the displacement will no longer be saved to zookeeper in , It is __consumer_offsets topic in
- If the consumer is based on java api To consume , That is to say
kafka.javaapi.consumer.ConsumerConnector, By configuring the parameterszookeeper.connectTo consume . This situation , Consumers' offset It will be updated to zookeeper Of/kafka/consumers/<group.id>/offsets/<topic>/<partitionId>, however zookeeper In fact, it is not suitable for mass reading and writing operations , Especially write operations . - therefore kafka Provides another solution : increase __consumer_offsets topic, take offset Write this message topic, Get rid of right zookeeper Dependence ( Refers to preservation offset This matter ).__consumer_offsets The message in saves each consumer group Submitted at a certain moment offset Information .
7.3 Create consumer acquisition __consumer_offsets topic Content
import kafka.common.OffsetAndMetadata;
import kafka.coordinator.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Properties;
public class KafkaCli {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "cdh003:9092");
// Each consumer is assigned a separate group number
props.put("group.id", "test_1");
// If value legal , Then automatically submit the offset
props.put("enable.auto.commit", "true");
// Set how often to update the offset of consumed messages
props.put("auto.commit.interval.ms", "1000");
// Set the session response time , Beyond that time kafka You can choose to give up spending or consume the next message
props.put("session.timeout.ms", "30000");
// This parameter indicates that the theme is consumed from scratch
props.put("auto.offset.reset", "earliest");
// Note that the deserialization method is ByteArrayDeserializer
props.put("key.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("__consumer_offsets"));
System.out.println("Subscribed to topic " + "__consumer_offsets");
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
for (ConsumerRecord<byte[], byte[]> record : records) {
// It's going to be record All the information of is written in System.out In print stream
// GroupMetadataManager.OffsetsMessageFormatter formatter = new GroupMetadataManager.OffsetsMessageFormatter();
// formatter.writeTo(record, System.out);
// Yes record Of key To analyze , Notice the key There are two kinds of OffsetKey and GroupMetaDataKey
//GroupMetaDataKey Only the consumer group ID Information ,OffsetKey It also includes consumption topic Information
BaseKey key = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key()));
if (key instanceof OffsetKey) {
GroupTopicPartition partition = (GroupTopicPartition) key.key();
String topic = partition.topicPartition().topic();
String group = partition.group();
System.out.println("group : " + group + " topic : " + topic);
System.out.println(key.toString());
} else if (key instanceof GroupMetadataKey) {
System.out.println("groupMetadataKey:------------ "+key.key());
}
// Yes record Of value To analyze
OffsetAndMetadata om = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(record.value()));
//System.out.println(om.toString());
}
}
}
}
Key The parsing output information of is as follows 
Value The parsing output information of is as follows 
Using this parsing method, the following exception will appear after a while :
Exception in thread "main" org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'metadata': Error reading string of length 25970, only 12 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:72)
at kafka.coordinator.GroupMetadataManager$.readOffsetMessageValue(GroupMetadataManager.scala:958)
at kafka.coordinator.GroupMetadataManager.readOffsetMessageValue(GroupMetadataManager.scala)
at com.wangjian.KafkaCli.main(KafkaCli.java:68)
When key The type is GroupMetadataKey when , Parse OffsetAndMetadata Would throw exceptions , We should use the following methods to analyze GroupMetadata
if (key instanceof GroupMetadataKey) {
System.out.println("groupMetadataKey: " + key.key());
// The first parameter is zero group id, First the key Convert to GroupMetadataKey type , Call it again key() Method to get group id
GroupMetadata groupMetadata = GroupMetadataManager.readGroupMessageValue(((GroupMetadataKey) key).key(), ByteBuffer.wrap(record.value()));
System.out.println("GroupMetadata: "+groupMetadata.toString());
}
Output is as follows
groupMetadataKey: test_1
GroupMetadata: [test_1,Some(consumer),Stable,Map(consumer-1-2b952983-41bd-4bdb-bc65-89ceedd91d26 -> [consumer-1-2b952983-41bd-4bdb-bc65-89ceedd91d26,consumer-1,/172.18.89.153,30000])]
You can see GroupMetadata The specific information of each consumer in the consumer group is saved in , Including where consumers are IP etc.
Use formatter The way to write System.out The output information is as follows
GroupMetadataManager.OffsetsMessageFormatter formatter = new GroupMetadataManager.OffsetsMessageFormatter();
formatter.writeTo(record, System.out);

This is complete OffsetMessage Information , At the same time, you can see __consumer_offsets topic The format of each log entry of is
[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
7.4 summary
kafka Store in __consumer_offsets Of topic There are two kinds of metadata information
- key:OffsetKey —————————> value:OffsetAndMetadata Save the consumer group partition Of offset Displacement information metadata
- key:GroupMetadataKey ————————> value:GroupMetadata Save the information of each consumer in the consumer group
Through java Development consumer To consume should topic When getting metadata , Attention should be paid to distinguishing these two situations , There is also deserialization .
about spark streaming Come on , among DStream in ,DStream The essence is RDD Sequence , Read kafka when , That is to say KafkaRDD, By reading the KafkaRDD Of getPartitions Method , You can find ,KafkaRDD Of partition Data and Kafka topic One of the partition Of o.fromOffset to o.untilOffset The data is corresponding , in other words KafkaRDD Of partition And Kafka partition It's one-to-one .
边栏推荐
- 浏览器打印边距,默认/无边距,占满1页A4
- 分享一个在树莓派运行dash应用的实例。
- China tetrabutyl urea (TBU) market trend report, technical dynamic innovation and market forecast
- Problem - 1646C. Factorials and Powers of Two - Codeforces
- 875. 爱吃香蕉的珂珂 - 力扣(LeetCode)
- 605. Planting flowers
- 解决Intel12代酷睿CPU单线程只给小核运行的问题
- How to insert mathematical formulas in CSDN blog
- 1855. Maximum distance of subscript alignment
- 使用jq实现全选 反选 和全不选-冯浩的博客
猜你喜欢

QT implementation fillet window

300th weekly match - leetcode

1855. Maximum distance of subscript alignment

Candy delivery (Mathematics)

Spark独立集群Worker和Executor的概念

Kubernetes集群部署

Raspberry pie 4B installation opencv3.4.0

解决Intel12代酷睿CPU【小核载满,大核围观】的问题(WIN11)

Installation and use of VMware Tools and open VM tools: solve the problems of incomplete screen and unable to transfer files of virtual machines

js封装数组反转的方法--冯浩的博客
随机推荐
Spark独立集群Worker和Executor的概念
Acwing - game 55 of the week
“鬼鬼祟祟的”新小行星将在本周安全掠过地球:如何观看
新手必会的静态站点生成器——Gridsome
Research Report on market supply and demand and strategy of China's four flat leadless (QFN) packaging industry
QWidget代码设置样式表探讨
QT realizes window topping, topping state switching, and multi window topping priority relationship
AcWing:第56场周赛
sublime text 代码格式化操作
Codeforces Round #802(Div. 2)A~D
Calculate the time difference
提交Spark应用的若干问题记录(sparklauncher with cluster deploy mode)
图图的学习笔记-进程
分享一个在树莓派运行dash应用的实例。
js时间函数大全 详细的讲解 -----阿浩博客
日期加1天
Log statistics (double pointer)
Installation and configuration of MariaDB
AcWing——第55场周赛
Codeforces Round #799 (Div. 4)A~H