当前位置:网站首页>Chapter 7__ consumer_ offsets topic
Chapter 7__ consumer_ offsets topic
2022-07-06 16:34:00 【Can't keep the setting sun】
7.1 The basic structure
- version: Version field , Different kafka edition version Different
- group: Corresponding to the consumer group groupid, This message will be sent to __consumer_offsets Which section of , It is determined by this field
- topic: Subject name
- partition: Theme partition
- version: Version field , Different kafka Version of version Different
- offset: This group Consume this topic Where is it
- metadata: Customize metadata information
- commit_timestamp: Time to submit consumption displacement
- expire_timestamp: Expiration time , When the data expires, there will be a scheduled task to clean up the expired messages
- Inquire about __consumer_offsets topic All contents
Be careful : Before running the following command, you must first consumer.properties Set in exclude.internal.topics=false The previous version
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning After the version ( contain )
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
- Get specified consumer group Displacement information
Message storage partition
Math.abs("test".hashCode()) % 50;//48, Express test This groupid Of offset The record was submitted to __consumer_offset Of 48 In partition No
- see __consumer_offsets Data structure of
//kafka 0.11 in the future ( contain )
kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition The specified partition --broker-list localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
//kafka0.11 before
kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition The specified partition --broker-list localhost:9092 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
You can see __consumer_offsets topic The format of each log entry of is
[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
7.2 kafka consumer offset storage
stay kafka 0.9 The version begins with a new consumer And consumer group, Great changes have taken place in the management and preservation mechanism of displacement , The new version of the consumer By default, the displacement will no longer be saved to zookeeper in , It is __consumer_offsets topic in
- If the consumer is based on java api To consume , That is to say
, By configuring the parameterszookeeper.connect
To consume . This situation , Consumers' offset It will be updated to zookeeper Of/kafka/consumers/<group.id>/offsets/<topic>/<partitionId>
, however zookeeper In fact, it is not suitable for mass reading and writing operations , Especially write operations . - therefore kafka Provides another solution : increase __consumer_offsets topic, take offset Write this message topic, Get rid of right zookeeper Dependence ( Refers to preservation offset This matter ).__consumer_offsets The message in saves each consumer group Submitted at a certain moment offset Information .
7.3 Create consumer acquisition __consumer_offsets topic Content
import kafka.common.OffsetAndMetadata;
import kafka.coordinator.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Properties;
public class KafkaCli {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "cdh003:9092");
// Each consumer is assigned a separate group number
props.put("group.id", "test_1");
// If value legal , Then automatically submit the offset
props.put("enable.auto.commit", "true");
// Set how often to update the offset of consumed messages
props.put("auto.commit.interval.ms", "1000");
// Set the session response time , Beyond that time kafka You can choose to give up spending or consume the next message
props.put("session.timeout.ms", "30000");
// This parameter indicates that the theme is consumed from scratch
props.put("auto.offset.reset", "earliest");
// Note that the deserialization method is ByteArrayDeserializer
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
System.out.println("Subscribed to topic " + "__consumer_offsets");
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
for (ConsumerRecord<byte[], byte[]> record : records) {
// It's going to be record All the information of is written in System.out In print stream
// GroupMetadataManager.OffsetsMessageFormatter formatter = new GroupMetadataManager.OffsetsMessageFormatter();
// formatter.writeTo(record, System.out);
// Yes record Of key To analyze , Notice the key There are two kinds of OffsetKey and GroupMetaDataKey
//GroupMetaDataKey Only the consumer group ID Information ,OffsetKey It also includes consumption topic Information
BaseKey key = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key()));
if (key instanceof OffsetKey) {
GroupTopicPartition partition = (GroupTopicPartition) key.key();
String topic = partition.topicPartition().topic();
String group = partition.group();
System.out.println("group : " + group + " topic : " + topic);
} else if (key instanceof GroupMetadataKey) {
System.out.println("groupMetadataKey:------------ "+key.key());
// Yes record Of value To analyze
OffsetAndMetadata om = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(record.value()));
Key The parsing output information of is as follows
Value The parsing output information of is as follows
Using this parsing method, the following exception will appear after a while :
Exception in thread "main" org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'metadata': Error reading string of length 25970, only 12 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:72)
at kafka.coordinator.GroupMetadataManager$.readOffsetMessageValue(GroupMetadataManager.scala:958)
at kafka.coordinator.GroupMetadataManager.readOffsetMessageValue(GroupMetadataManager.scala)
at com.wangjian.KafkaCli.main(KafkaCli.java:68)
When key The type is GroupMetadataKey when , Parse OffsetAndMetadata Would throw exceptions , We should use the following methods to analyze GroupMetadata
if (key instanceof GroupMetadataKey) {
System.out.println("groupMetadataKey: " + key.key());
// The first parameter is zero group id, First the key Convert to GroupMetadataKey type , Call it again key() Method to get group id
GroupMetadata groupMetadata = GroupMetadataManager.readGroupMessageValue(((GroupMetadataKey) key).key(), ByteBuffer.wrap(record.value()));
System.out.println("GroupMetadata: "+groupMetadata.toString());
Output is as follows
groupMetadataKey: test_1
GroupMetadata: [test_1,Some(consumer),Stable,Map(consumer-1-2b952983-41bd-4bdb-bc65-89ceedd91d26 -> [consumer-1-2b952983-41bd-4bdb-bc65-89ceedd91d26,consumer-1,/,30000])]
You can see GroupMetadata The specific information of each consumer in the consumer group is saved in , Including where consumers are IP etc.
Use formatter The way to write System.out The output information is as follows
GroupMetadataManager.OffsetsMessageFormatter formatter = new GroupMetadataManager.OffsetsMessageFormatter();
formatter.writeTo(record, System.out);
This is complete OffsetMessage Information , At the same time, you can see __consumer_offsets topic The format of each log entry of is
[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
7.4 summary
kafka Store in __consumer_offsets Of topic There are two kinds of metadata information
- key:OffsetKey —————————> value:OffsetAndMetadata Save the consumer group partition Of offset Displacement information metadata
- key:GroupMetadataKey ————————> value:GroupMetadata Save the information of each consumer in the consumer group
Through java Development consumer To consume should topic When getting metadata , Attention should be paid to distinguishing these two situations , There is also deserialization .
about spark streaming Come on , among DStream in ,DStream The essence is RDD Sequence , Read kafka when , That is to say KafkaRDD, By reading the KafkaRDD Of getPartitions Method , You can find ,KafkaRDD Of partition Data and Kafka topic One of the partition Of o.fromOffset to o.untilOffset The data is corresponding , in other words KafkaRDD Of partition And Kafka partition It's one-to-one .
- 力扣:第81场双周赛
- Codeforces - 1526C1&&C2 - Potions
- window11 conda安装pytorch过程中遇到的一些问题
- Codeforces Round #802(Div. 2)A~D
- 图图的学习笔记-进程
- Market trend report, technological innovation and market forecast of China's double sided flexible printed circuit board (FPC)
- (POJ - 2739) sum of constructive prime numbers (ruler or two points)
- 807. Maintain the urban skyline
- QT implementation window gradually disappears qpropertyanimation+ progress bar
- VMware Tools和open-vm-tools的安装与使用:解决虚拟机不全屏和无法传输文件的问题
Candy delivery (Mathematics)
Codeforces Round #802(Div. 2)A~D
Is the sanic asynchronous framework really so strong? Find truth in practice
Acwing: Game 58 of the week
Calculate the time difference
Pytorch extract skeleton (differentiable)
Log statistics (double pointer)
Effet d'utilisation, déclenché lorsque les composants de la fonction sont montés et déchargés
Market trend report, technical innovation and market forecast of double-sided foam tape in China
Market trend report, technological innovation and market forecast of double door and multi door refrigerators in China
QT style settings of qcobobox controls (rounded corners, drop-down boxes, up expansion, editable, internal layout, etc.)
Sword finger offer II 019 Delete at most one character to get a palindrome
Some problems encountered in installing pytorch in windows11 CONDA
1605. Sum the feasible matrix for a given row and column
Codeforces Round #797 (Div. 3)无F
Educational Codeforces Round 130 (Rated for Div. 2)A~C