当前位置:网站首页>Chapter 7__ consumer_ offsets topic
Chapter 7__ consumer_ offsets topic
2022-07-06 16:34:00 【Can't keep the setting sun】
7.1 The basic structure
key
- version: Version field , Different kafka edition version Different
- group: Corresponding to the consumer group groupid, This message will be sent to __consumer_offsets Which section of , It is determined by this field
- topic: Subject name
- partition: Theme partition
value
- version: Version field , Different kafka Version of version Different
- offset: This group Consume this topic Where is it
- metadata: Customize metadata information
- commit_timestamp: Time to submit consumption displacement
- expire_timestamp: Expiration time , When the data expires, there will be a scheduled task to clean up the expired messages
- Inquire about __consumer_offsets topic All contents
Be careful : Before running the following command, you must first consumer.properties Set in exclude.internal.topics=false
0.11.0.0 The previous version
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
0.11.0.0 After the version ( contain )
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
- Get specified consumer group Displacement information
Message storage partition
Math.abs("test".hashCode()) % 50;//48, Express test This groupid Of offset The record was submitted to __consumer_offset Of 48 In partition No
- see __consumer_offsets Data structure of
//kafka 0.11 in the future ( contain )
kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition The specified partition --broker-list localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
//kafka0.11 before
kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition The specified partition --broker-list localhost:9092 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
You can see __consumer_offsets topic The format of each log entry of is
[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
7.2 kafka consumer offset storage
stay kafka 0.9 The version begins with a new consumer And consumer group, Great changes have taken place in the management and preservation mechanism of displacement , The new version of the consumer By default, the displacement will no longer be saved to zookeeper in , It is __consumer_offsets topic in
- If the consumer is based on java api To consume , That is to say
kafka.javaapi.consumer.ConsumerConnector
, By configuring the parameterszookeeper.connect
To consume . This situation , Consumers' offset It will be updated to zookeeper Of/kafka/consumers/<group.id>/offsets/<topic>/<partitionId>
, however zookeeper In fact, it is not suitable for mass reading and writing operations , Especially write operations . - therefore kafka Provides another solution : increase __consumer_offsets topic, take offset Write this message topic, Get rid of right zookeeper Dependence ( Refers to preservation offset This matter ).__consumer_offsets The message in saves each consumer group Submitted at a certain moment offset Information .
7.3 Create consumer acquisition __consumer_offsets topic Content
import kafka.common.OffsetAndMetadata;
import kafka.coordinator.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Properties;
public class KafkaCli {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "cdh003:9092");
// Each consumer is assigned a separate group number
props.put("group.id", "test_1");
// If value legal , Then automatically submit the offset
props.put("enable.auto.commit", "true");
// Set how often to update the offset of consumed messages
props.put("auto.commit.interval.ms", "1000");
// Set the session response time , Beyond that time kafka You can choose to give up spending or consume the next message
props.put("session.timeout.ms", "30000");
// This parameter indicates that the theme is consumed from scratch
props.put("auto.offset.reset", "earliest");
// Note that the deserialization method is ByteArrayDeserializer
props.put("key.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("__consumer_offsets"));
System.out.println("Subscribed to topic " + "__consumer_offsets");
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
for (ConsumerRecord<byte[], byte[]> record : records) {
// It's going to be record All the information of is written in System.out In print stream
// GroupMetadataManager.OffsetsMessageFormatter formatter = new GroupMetadataManager.OffsetsMessageFormatter();
// formatter.writeTo(record, System.out);
// Yes record Of key To analyze , Notice the key There are two kinds of OffsetKey and GroupMetaDataKey
//GroupMetaDataKey Only the consumer group ID Information ,OffsetKey It also includes consumption topic Information
BaseKey key = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key()));
if (key instanceof OffsetKey) {
GroupTopicPartition partition = (GroupTopicPartition) key.key();
String topic = partition.topicPartition().topic();
String group = partition.group();
System.out.println("group : " + group + " topic : " + topic);
System.out.println(key.toString());
} else if (key instanceof GroupMetadataKey) {
System.out.println("groupMetadataKey:------------ "+key.key());
}
// Yes record Of value To analyze
OffsetAndMetadata om = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(record.value()));
//System.out.println(om.toString());
}
}
}
}
Key The parsing output information of is as follows
Value The parsing output information of is as follows
Using this parsing method, the following exception will appear after a while :
Exception in thread "main" org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'metadata': Error reading string of length 25970, only 12 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:72)
at kafka.coordinator.GroupMetadataManager$.readOffsetMessageValue(GroupMetadataManager.scala:958)
at kafka.coordinator.GroupMetadataManager.readOffsetMessageValue(GroupMetadataManager.scala)
at com.wangjian.KafkaCli.main(KafkaCli.java:68)
When key The type is GroupMetadataKey when , Parse OffsetAndMetadata Would throw exceptions , We should use the following methods to analyze GroupMetadata
if (key instanceof GroupMetadataKey) {
System.out.println("groupMetadataKey: " + key.key());
// The first parameter is zero group id, First the key Convert to GroupMetadataKey type , Call it again key() Method to get group id
GroupMetadata groupMetadata = GroupMetadataManager.readGroupMessageValue(((GroupMetadataKey) key).key(), ByteBuffer.wrap(record.value()));
System.out.println("GroupMetadata: "+groupMetadata.toString());
}
Output is as follows
groupMetadataKey: test_1
GroupMetadata: [test_1,Some(consumer),Stable,Map(consumer-1-2b952983-41bd-4bdb-bc65-89ceedd91d26 -> [consumer-1-2b952983-41bd-4bdb-bc65-89ceedd91d26,consumer-1,/172.18.89.153,30000])]
You can see GroupMetadata The specific information of each consumer in the consumer group is saved in , Including where consumers are IP etc.
Use formatter The way to write System.out The output information is as follows
GroupMetadataManager.OffsetsMessageFormatter formatter = new GroupMetadataManager.OffsetsMessageFormatter();
formatter.writeTo(record, System.out);
This is complete OffsetMessage Information , At the same time, you can see __consumer_offsets topic The format of each log entry of is
[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
7.4 summary
kafka Store in __consumer_offsets Of topic There are two kinds of metadata information
- key:OffsetKey —————————> value:OffsetAndMetadata Save the consumer group partition Of offset Displacement information metadata
- key:GroupMetadataKey ————————> value:GroupMetadata Save the information of each consumer in the consumer group
Through java Development consumer To consume should topic When getting metadata , Attention should be paid to distinguishing these two situations , There is also deserialization .
about spark streaming Come on , among DStream in ,DStream The essence is RDD Sequence , Read kafka when , That is to say KafkaRDD, By reading the KafkaRDD Of getPartitions Method , You can find ,KafkaRDD Of partition Data and Kafka topic One of the partition Of o.fromOffset to o.untilOffset The data is corresponding , in other words KafkaRDD Of partition And Kafka partition It's one-to-one .
边栏推荐
- (POJ - 1458) common subsequence (longest common subsequence)
- Hbuilder X格式化快捷键设置
- 图图的学习笔记-进程
- Codeforces Round #800 (Div. 2)AC
- (lightoj - 1323) billiard balls (thinking)
- (lightoj - 1349) Aladdin and the optimal invitation (greed)
- Installation and use of VMware Tools and open VM tools: solve the problems of incomplete screen and unable to transfer files of virtual machines
- AcWing:第58场周赛
- Raspberry pie 4B installation opencv3.4.0
- It is forbidden to trigger onchange in antd upload beforeupload
猜你喜欢
解决Intel12代酷睿CPU单线程调度问题(二)
提交Spark应用的若干问题记录(sparklauncher with cluster deploy mode)
Advancedinstaller installation package custom action open file
QT style settings of qcobobox controls (rounded corners, drop-down boxes, up expansion, editable, internal layout, etc.)
Installation and configuration of MariaDB
Suffix expression (greed + thinking)
Read and save zarr files
(lightoj - 1323) billiard balls (thinking)
VMware Tools和open-vm-tools的安装与使用:解决虚拟机不全屏和无法传输文件的问题
969. Pancake sorting
随机推荐
875. 爱吃香蕉的珂珂 - 力扣(LeetCode)
QT实现圆角窗口
sublime text 代码格式化操作
useEffect,函數組件掛載和卸載時觸發
409. Longest palindrome
Research Report on hearing health care equipment industry - market status analysis and development prospect prediction
(POJ - 2739) sum of constructive prime numbers (ruler or two points)
本地可视化工具连接阿里云centOS服务器的redis
Bisphenol based CE Resin Industry Research Report - market status analysis and development prospect forecast
Some problems encountered in installing pytorch in windows11 CONDA
Li Kou - 298th weekly match
Openwrt build Hello ipk
Date plus 1 day
Raspberry pie 4B installation opencv3.4.0
Is the sanic asynchronous framework really so strong? Find truth in practice
js封装数组反转的方法--冯浩的博客
1855. Maximum distance of subscript alignment
Sanic异步框架真的这么强吗?实践中找真理
(lightoj - 1369) answering queries (thinking)
Market trend report, technical innovation and market forecast of tabletop dishwashers in China