当前位置:网站首页>Chapter 7__ consumer_ offsets topic
Chapter 7__ consumer_ offsets topic
2022-07-06 16:34:00 【Can't keep the setting sun】
7.1 The basic structure

key
- version: Version field , Different kafka edition version Different
- group: Corresponding to the consumer group groupid, This message will be sent to __consumer_offsets Which section of , It is determined by this field
- topic: Subject name
- partition: Theme partition
value
- version: Version field , Different kafka Version of version Different
- offset: This group Consume this topic Where is it
- metadata: Customize metadata information
- commit_timestamp: Time to submit consumption displacement
- expire_timestamp: Expiration time , When the data expires, there will be a scheduled task to clean up the expired messages
- Inquire about __consumer_offsets topic All contents
Be careful : Before running the following command, you must first consumer.properties Set in exclude.internal.topics=false
0.11.0.0 The previous version
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
0.11.0.0 After the version ( contain )
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
- Get specified consumer group Displacement information
Message storage partition
Math.abs("test".hashCode()) % 50;//48, Express test This groupid Of offset The record was submitted to __consumer_offset Of 48 In partition No
- see __consumer_offsets Data structure of
//kafka 0.11 in the future ( contain )
kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition The specified partition --broker-list localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
//kafka0.11 before
kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition The specified partition --broker-list localhost:9092 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
You can see __consumer_offsets topic The format of each log entry of is
[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
7.2 kafka consumer offset storage
stay kafka 0.9 The version begins with a new consumer And consumer group, Great changes have taken place in the management and preservation mechanism of displacement , The new version of the consumer By default, the displacement will no longer be saved to zookeeper in , It is __consumer_offsets topic in
- If the consumer is based on java api To consume , That is to say
kafka.javaapi.consumer.ConsumerConnector, By configuring the parameterszookeeper.connectTo consume . This situation , Consumers' offset It will be updated to zookeeper Of/kafka/consumers/<group.id>/offsets/<topic>/<partitionId>, however zookeeper In fact, it is not suitable for mass reading and writing operations , Especially write operations . - therefore kafka Provides another solution : increase __consumer_offsets topic, take offset Write this message topic, Get rid of right zookeeper Dependence ( Refers to preservation offset This matter ).__consumer_offsets The message in saves each consumer group Submitted at a certain moment offset Information .
7.3 Create consumer acquisition __consumer_offsets topic Content
import kafka.common.OffsetAndMetadata;
import kafka.coordinator.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Properties;
public class KafkaCli {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "cdh003:9092");
// Each consumer is assigned a separate group number
props.put("group.id", "test_1");
// If value legal , Then automatically submit the offset
props.put("enable.auto.commit", "true");
// Set how often to update the offset of consumed messages
props.put("auto.commit.interval.ms", "1000");
// Set the session response time , Beyond that time kafka You can choose to give up spending or consume the next message
props.put("session.timeout.ms", "30000");
// This parameter indicates that the theme is consumed from scratch
props.put("auto.offset.reset", "earliest");
// Note that the deserialization method is ByteArrayDeserializer
props.put("key.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("__consumer_offsets"));
System.out.println("Subscribed to topic " + "__consumer_offsets");
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
for (ConsumerRecord<byte[], byte[]> record : records) {
// It's going to be record All the information of is written in System.out In print stream
// GroupMetadataManager.OffsetsMessageFormatter formatter = new GroupMetadataManager.OffsetsMessageFormatter();
// formatter.writeTo(record, System.out);
// Yes record Of key To analyze , Notice the key There are two kinds of OffsetKey and GroupMetaDataKey
//GroupMetaDataKey Only the consumer group ID Information ,OffsetKey It also includes consumption topic Information
BaseKey key = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key()));
if (key instanceof OffsetKey) {
GroupTopicPartition partition = (GroupTopicPartition) key.key();
String topic = partition.topicPartition().topic();
String group = partition.group();
System.out.println("group : " + group + " topic : " + topic);
System.out.println(key.toString());
} else if (key instanceof GroupMetadataKey) {
System.out.println("groupMetadataKey:------------ "+key.key());
}
// Yes record Of value To analyze
OffsetAndMetadata om = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(record.value()));
//System.out.println(om.toString());
}
}
}
}
Key The parsing output information of is as follows 
Value The parsing output information of is as follows 
Using this parsing method, the following exception will appear after a while :
Exception in thread "main" org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'metadata': Error reading string of length 25970, only 12 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:72)
at kafka.coordinator.GroupMetadataManager$.readOffsetMessageValue(GroupMetadataManager.scala:958)
at kafka.coordinator.GroupMetadataManager.readOffsetMessageValue(GroupMetadataManager.scala)
at com.wangjian.KafkaCli.main(KafkaCli.java:68)
When key The type is GroupMetadataKey when , Parse OffsetAndMetadata Would throw exceptions , We should use the following methods to analyze GroupMetadata
if (key instanceof GroupMetadataKey) {
System.out.println("groupMetadataKey: " + key.key());
// The first parameter is zero group id, First the key Convert to GroupMetadataKey type , Call it again key() Method to get group id
GroupMetadata groupMetadata = GroupMetadataManager.readGroupMessageValue(((GroupMetadataKey) key).key(), ByteBuffer.wrap(record.value()));
System.out.println("GroupMetadata: "+groupMetadata.toString());
}
Output is as follows
groupMetadataKey: test_1
GroupMetadata: [test_1,Some(consumer),Stable,Map(consumer-1-2b952983-41bd-4bdb-bc65-89ceedd91d26 -> [consumer-1-2b952983-41bd-4bdb-bc65-89ceedd91d26,consumer-1,/172.18.89.153,30000])]
You can see GroupMetadata The specific information of each consumer in the consumer group is saved in , Including where consumers are IP etc.
Use formatter The way to write System.out The output information is as follows
GroupMetadataManager.OffsetsMessageFormatter formatter = new GroupMetadataManager.OffsetsMessageFormatter();
formatter.writeTo(record, System.out);

This is complete OffsetMessage Information , At the same time, you can see __consumer_offsets topic The format of each log entry of is
[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
7.4 summary
kafka Store in __consumer_offsets Of topic There are two kinds of metadata information
- key:OffsetKey —————————> value:OffsetAndMetadata Save the consumer group partition Of offset Displacement information metadata
- key:GroupMetadataKey ————————> value:GroupMetadata Save the information of each consumer in the consumer group
Through java Development consumer To consume should topic When getting metadata , Attention should be paid to distinguishing these two situations , There is also deserialization .
about spark streaming Come on , among DStream in ,DStream The essence is RDD Sequence , Read kafka when , That is to say KafkaRDD, By reading the KafkaRDD Of getPartitions Method , You can find ,KafkaRDD Of partition Data and Kafka topic One of the partition Of o.fromOffset to o.untilOffset The data is corresponding , in other words KafkaRDD Of partition And Kafka partition It's one-to-one .
边栏推荐
- Codeforces - 1526C1&&C2 - Potions
- The "sneaky" new asteroid will pass the earth safely this week: how to watch it
- OneForAll安装使用
- Codeforces Round #799 (Div. 4)A~H
- 软通乐学-js求字符串中字符串当中那个字符出现的次数多 -冯浩的博客
- QT style settings of qcobobox controls (rounded corners, drop-down boxes, up expansion, editable, internal layout, etc.)
- 拉取分支失败,fatal: ‘origin/xxx‘ is not a commit and a branch ‘xxx‘ cannot be created from it
- Advancedinstaller安装包自定义操作打开文件
- Flask框架配置loguru日志庫
- Installation and use of VMware Tools and open VM tools: solve the problems of incomplete screen and unable to transfer files of virtual machines
猜你喜欢

树莓派4B64位系统安装miniconda(折腾了几天终于解决)

Raspberry pie 4b64 bit system installation miniconda (it took a few days to finally solve it)

QT style settings of qcobobox controls (rounded corners, drop-down boxes, up expansion, editable, internal layout, etc.)

软通乐学-js求字符串中字符串当中那个字符出现的次数多 -冯浩的博客

Local visualization tools are connected to redis of Alibaba cloud CentOS server

Log statistics (double pointer)

921. Minimum additions to make parentheses valid

Li Kou - 298th weekly match

Installation and use of VMware Tools and open VM tools: solve the problems of incomplete screen and unable to transfer files of virtual machines

去掉input聚焦时的边框
随机推荐
Installation and configuration of MariaDB
Acwing: the 56th weekly match
Codeforces Round #800 (Div. 2)AC
VMware Tools和open-vm-tools的安装与使用:解决虚拟机不全屏和无法传输文件的问题
QT style settings of qcobobox controls (rounded corners, drop-down boxes, up expansion, editable, internal layout, etc.)
简单尝试DeepFaceLab(DeepFake)的新AMP模型
Codeforces Round #801 (Div. 2)A~C
图图的学习笔记-进程
Hbuilder X格式化快捷键设置
Codeforces Round #798 (Div. 2)A~D
Kubernetes集群部署
Calculate the time difference
Browser print margin, default / borderless, full 1 page A4
AcWing——第55场周赛
Suffix expression (greed + thinking)
Codeforces Round #802(Div. 2)A~D
875. Leetcode, a banana lover
Spark独立集群动态上线下线Worker节点
Input can only input numbers, limited input
The "sneaky" new asteroid will pass the earth safely this week: how to watch it