当前位置:网站首页>Various configurations when pulsar starts the client (client, producer, consumer)
Various configurations when pulsar starts the client (client, producer, consumer)
2022-07-29 04:41:00 【SparkSql】
producer
Reference code
// Construction producer
this.producer = client.newProducer(Schema.BYTES)
.producerName(judgeValue(“producerName”))
.topic(judgeValue(“topic”))
.batchingMaxMessages(Integer.parseInt(judgeValue(“batchingMaxMessages”)))
.batchingMaxPublishDelay(Long.parseLong(judgeValue(“batchingMaxPublishDelay”)), TimeUnit.MILLISECONDS)
.enableBatching(Boolean.getBoolean(judgeValue(“enableBatching”)))
.blockIfQueueFull(Boolean.getBoolean(judgeValue(“blockIfQueueFull”)))
.maxPendingMessages(Integer.parseInt(judgeValue(“maxPendingMessages”)))
// Set message sending timeout ,
.sendTimeout(Integer.parseInt(judgeValue(“sendTimeoutTimeUnit”)), TimeUnit.SECONDS)
// Set cluster routing policy
// .messageRoutingMode(MessageRoutingMode.CustomPartition).messageRouter(new MessageRouter() {
// @Override
// public int choosePartition(Message<?> message, TopicMetadata metadata) {
// return 0;
// }})
.create();
Data in the configuration file
serviceURL: “pulsar://192.168.100.13:6650”
producerName: “my-producer99”
#topic: “persistent://zhiwang3/whds3/ClientEnvReport”
topic: “persistent://zhiwang3/whds9/admin3”
batchingMaxMessages: “1024”
batchingMaxPublishDelay: “50”
enableBatching: “true”
blockIfQueueFull: “true”
maxPendingMessages: “512”
sendTimeoutTimeUnit: “86400”
batchingMaxPublishDelayTimeUnit: “10”
topicPartition_num: 4
#50ms send a message
messageNum: 600
pulsar2.3 Version support many url. Configurable like this serviceURL;
Reference link
http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerBuilder
Configuration item
* “topicName” : “persistent://public/pulsar-cluster/default/my-topic”, //topicName It consists of four parts [topic type :// Tenant name / Namespace / topic name ]
* “producerName” : “my-producer”, // Name of producer
* “sendTimeoutMs” : 30000, // Send timeout , Default 30s
* “blockIfQueueFull” : false, // Whether to block the sending operation when the message queue is full Default false, When the message queue is full , The send operation will immediately fail
* “maxPendingMessages” : 1000,// Set waiting to receive from broker The maximum size of the queue of confirmation messages , Queue full test ,blockIfQueueFull=true It works
* “maxPendingMessagesAcrossPartitions” : 50000,// Set the maximum number of pending messages for all partitions
* “messageRoutingMode” : “CustomPartition”, // Message distribution routing mode CustomPartition;RoundRobinPartition Loop traversal partition ;SinglePartition Randomly select a partition // Reference resources http://pulsar.apache.org/docs/zh-CN/2.2.0/cookbooks-partitioned/
* “hashingScheme” : “JavaStringHash”,// Change the hash scheme of the partition used to choose where to publish a specific message
* “cryptoFailureAction” : “FAIL”,// Specify a default specific value for the expired producer
* “batchingMaxPublishDelayMicros” : 1000,// Set the default value of the time period when the sent messages will be processed in batches : If batch messages are enabled , Then for 1 millisecond .
* “batchingMaxMessages” : 1000, // Set the maximum number of messages allowed in batch
* “batchingEnabled” : true, // Controls whether automatic batching of messages is enabled for producers .
* “compressionType” : “NONE”, // Set the compression type of the producer
* “initialSequenceId” : null, // Set the sequence for the messages published by the producer ID The basic value of
* “properties” : { } // Set properties for producers
client Configuration information
Reference code
this.client = PulsarClient.builder()
.serviceUrl(judgeValue(“serviceURL”))
.enableTcpNoDelay(true)
.build();
Data in the configuration file
serviceURL: “pulsar://192.168.100.13:6650”
Reference link
http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/PulsarClient.html
http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientBuilder.html
Configuration item
* “serviceUrl” : “pulsar://localhost:6650”, //broker The cluster address
* “operationTimeoutMs” : 30000, // Operation timeout setting
* “statsIntervalSeconds” : 60, // Set the interval between each statistic ( The default value is :60 second ) The number of seconds between statistics will be activated in a positive state should be set to at least 1 second
* “numIoThreads” : 1,// Set up for processing and broker Number of threads connected ( The default value is :1 Threads )
* “numListenerThreads” : 1,// Set the number of threads to use for message listeners ( The default value is :1 Threads )
* “connectionsPerBroker” : 1, // Setting the client library will send a single broker Maximum number of open connections .
* “useTcpNoDelay” : true, // Configure whether to use delay on the connection tcp, The default is true. No delay function ensures that data packets are sent to the network as soon as possible , Achieving low latency publishing is critical . On the other hand , Sending a large number of small packets may limit the overall throughput .
* “useTls” : false, // Enable ssl, stay serviceurl Use in “pulsar+ssl://” Enable
* “tlsTrustCertsFilePath” : “”,// Set trusted TLS Path to certificate file
* “tlsAllowInsecureConnection” : false, // To configure pulsar Whether the client accepts from broker Not trusted TLS certificate ( The default value is :false)
* “tlsHostnameVerificationEnable” : false,// It allows the client to pass TLS Verify hostname verification when connecting to the agent
* “concurrentLookupRequest” : 5000,// Allow in each broker The number of concurrent lookup requests sent on the connection , To prevent agent overload .
* “maxLookupRequest” : 50000,// To prevent broker overload , Every broker The maximum number of lookup requests allowed on the connection .
* “maxNumberOfRejectedRequestPerConnection” : 50,// Set in a specific time period (30 second ) Rejected within broker The maximum number of requests , After this period , The current connection will be closed , The client will create a new connection , In order to have the opportunity to connect other broker( The default value is :50)
* “keepAliveIntervalSeconds” : 30 // For each client broker The connection sets the heartbeat detection time in seconds
* consumer To configure
Reference code
consumer = client.newConsumer(Schema.BYTES)
.consumerName(CONSUME_NAME)
.topic(topic)
.subscriptionName(subsciptionName)
.ackTimeout(ACKTIMEOUT, TimeUnit.SECONDS)
.maxTotalReceiverQueueSizeAcrossPartitions(MTRQSAP)
.subscriptionInitialPosition(subscriptionInitialPosition)
.subscriptionType(sT)
.subscribe();
Data in the configuration file
serviceURL: “pulsar://192.168.100.13:6650”
consumeName: “my-consume17”
#topic: “persistent://zhiwang3/whds3/ClientEnvReport”
topic: “persistent://zhiwang3/whds7/8_192.168.100.118-partition-0”
ackTimeOut: “360”
maxTotalReceiverQueueSizeAcrossPartitions: “10”
#Exclusive, Shared, Failover;
subscriptionType: “Exclusive”
receiveOneMessageWaitTime: “5”
waitMessageTime: “10”
subsciptionName: “my-consume-default10.role”
subscriptionInitialPosition: “Earliest”
Reference link
* //http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder
Configuration item
* “topicNames” : [ ], // Topics that consumers subscribe to
* “topicsPattern” : null, // Specify the mode of the topic this user will subscribe to . It accepts regular expressions , And will be internally compiled into patterns . for example :“persistent://prop/use/ns abc/pattern topic-.*”
* “subscriptionName” : “my-subscription”, // Consumer's subscription name
* “subscriptionType” : “Exclusive”,// Select the subscription type to use when subscribing to the topic . Exclusive Monopoly ;Failover Fail over ;Shared share
* “receiverQueueSize” : 3,// Set the size of the consumer receiving queue .
* “acknowledgementsGroupTimeMicros” : 100000, // Group consumers at a specified time
* “maxTotalReceiverQueueSizeAcrossPartitions” : 10, // Set the maximum total receiver queue size across partitions
* “consumerName” : “my-consumer”, // The name of the consumer
* “ackTimeoutMillis” : 10000,// Set the timeout for unacknowledged messages
* “priorityLevel” : 0, // Set priority levels for shared subscription consumers ,broker When scheduling messages, give them a higher priority .
* “cryptoFailureAction” : “FAIL”,// Specify a default specific value for the invalid consumer
* “properties” : { }, // Setting property values
* “readCompacted” : false, // If enabled , The consumer will read the message from the compressed topic , Instead of reading the full message backlog of the subject .
* “subscriptionInitialPosition” : “Latest”, // Set the initial subscription location of the consumer Earliest From the earliest position , The first message . Latest From the last position , The last message .
* “patternAutoDiscoveryPeriod” : 1, // Set the theme auto discovery cycle for theme consumers when using the mode .
* “subscriptionTopicsMode” : “PERSISTENT”,// Determine which topics this consumer should subscribe to - Persistence theme 、 Non persistent topics or both should be subscribed .
* “deadLetterPolicy” : null // Dead letter strategy Set up dead letter strategies for consumers , Some messages will be retransmitted as many times as possible . By using dead letter mechanism , The message will have a maximum redelivery count , When the message exceeds the maximum number of redeliveries , The message will be sent to the dead letter subject and automatically confirmed . You can enable the dead letter mechanism by setting the dead letter policy .
边栏推荐
- 论pyscript使用感想(实现office预览)
- ssm整合增删改查
- Several simple and difficult OJ problems with sequential force deduction
- Common current limiting methods
- MySQL - 深入解析MySQL索引数据结构
- Unity Foundation (3) -- various coordinate systems in unity
- File operation (Advanced C language)
- On the use of pyscript (realizing office preview)
- Review key points and data sorting of information metrology in the second semester of 2022 (teacher zhaorongying of Wuhan University)
- Vscode configuration makefile compilation
猜你喜欢

The most complete NLP Chinese and English stop words list in the whole station (including punctuation marks, which can be copied directly)

ssm整合增删改查

There are objections and puzzles about joinpoint in afterreturning notice (I hope someone will leave a message)

Star a pathfinding in LAYA

Unity Foundation (3) -- various coordinate systems in unity

如何避免示波器电流探头损坏

Actual combat of flutter - DIO of request encapsulation (II)

Redux quick start

恒星科通邀您“湘”约第24届中国高速公路信息化大会暨技术产品展示会

Pycharm reports an error when connecting to the virtual machine database
随机推荐
mujoco和mujoco_py安装以及解决libXcursor.so.1:NO such dictionary
New year's greetings from programmers
Niuke IOI weekly 27 popularity group
Oracle update and delete data
常见的限流方式
WebRTC实现简单音视频通话功能
Unity Foundation (3) -- various coordinate systems in unity
Pyqt5 learning pit encounter and pit drainage (2) buttons in qformlayout layout cannot be displayed
Whether the modification of import and export values by ES6 and commonjs affects the original module
Flutter实战-请求封装(二)之dio
央企建筑企业数字化转型核心特征是什么?
Several simple and difficult OJ problems with sequential force deduction
盒子水平垂直居中布局(总结)
Recyclerview switches the focus up and down through the dpad key. When switching to the control outside the interface, the focus will jump left and right
Detailed comparison of break and continue functions
The third ACM program design competition of Wuhan University of Engineering
Use of construction methods
读懂 互联网巨头 【中台之战】 以及 中台 发展思维
Basic grammar of C language
Oracle 插入数据