当前位置:网站首页>Pulsar 生产者
Pulsar 生产者
2022-06-13 11:53:00 【swadian2008】
目录
生产者(producer)是一个附加主题(topic)并把消息(messages)发布到 Pulsar broker 的程序,Pulsar broker 会处理接收到的消息(messages)。
1.消息发送方式
生产者(Producers)发送消息(messages)到 brokers 可以是同步的(sync),也可以是异步的(async)
| Mode | Description |
|---|---|
Sync send 同步发送 | The producer waits for an acknowledgement from the broker after sending every message. If the acknowledgment is not received, the producer treats the sending operation as a failure. // 生产者发送的每一条消息都需要等待broker的确认,如果没有收到确认,生产者认为此次消息发送失败 |
Async send 异步发送 | The producer puts a message in a blocking queue and returns immediately. The client library sends the message to the broker in the background. If the queue is full (you can configure the maximum size), the producer is blocked or fails immediately when calling the API, depending on arguments passed to the producer. // 生产者将消息放入阻塞队列并立即返回。客户端在后台将消息发送给代理。如果队列已满(可以配置大小),则根据传递给生产者的参数,生产者会被阻止或在调用 API 时立即失败。 |
2.访问模式
对于生产者的主题,有以下不同类型的访问模式。
| Access mode | Description |
|---|---|
| Multiple producers can publish on a topic. // 默认配置 |
| Only one producer can publish on a topic. // 当一个生产者因为网络中断连接,broker 会选择一个新的 producer 成为专用生产者 |
| Only one producer can publish on a topic. // 如果已经有生产者连接,其他生产者将被移除并且立即失效 |
等待独占 | If there is already a producer connected, the producer creation is pending (rather than timing out) until the producer gets the // 成功获取独占访问权的生产者将被视为一个leader,因此,如果你想为应用程序实现一个leader选举机制,可以选用该访问模式。需要注意的是,leader机制涉及到Pulsar的WAL日志,leader会把”决策“写入到主题。错误情况,当leader写入消息失败,会得到broker的通知,通知该生产者将不再是一个leader。 |
注意事项
一旦应用程序成功创建了Exclusive或WaitForExclusive访问模式的生产者,此应用程序的实例需要保证是主题的唯一编写器。任何其他试图生成有关此主题的消息的生产者都会立即出错或者必须等待,直到他们获得独占的访问权。
3.消息压缩
可以压缩生产者在传输过程中发布的消息。Pulsar目前支持以下类型的压缩:
- LZ4:LZ算法系列的一种,号称是目前最快的压缩算法之一
- ZLIB:zlib是用于数据压缩的一个简单的库,仅支持LZ77的变种算法
- ZSTD:Facebook开源的新无损压缩算法,优点是压缩率和压缩/解压缩性能都很突出
- SNAPPY:提供高速压缩速度和合理的压缩率。Snappy 比 zlib 更快,但文件相对要大 20% 到 100%。
压缩的通用原理:假如当前位置的一个字符串序列,在以前的历史数据中也出现过,那么现在用一种特殊的格式或者特殊的小序列来表示它,就可以起到压缩的效果,因为特殊格式或者特殊小序列通常都比原本的字符串序列更小。
4.批量消息
启用批处理后,生产者(producer)在单个请求中累积并发送一批消息(messages)。批量大小由最大消息数和最大发布延迟决定。因此,backlog大小表示批量的大小,而不是消息的大小。
在Pulsar中,批量消息作为单个单元而不是单个消息进行跟踪和存储。消费者需要将批量消息拆分为单个的消息。但是,即使启用了批处理,scheduled messages(通过 deliverAt 或 deliverAfter 参数配置)也始终作为单独的消息发送。
通常,当批量中的所有消息都被消费者确认时,该批量也会被确认。但是,当批量中有消息没有被确认,或者出现意外的失败,否定的确认以及确认超时会导致批量中的所有消息都重新的传递。
为了避免将批量中已确认的消息重新发送给消费者,Pulsar从Pulsar 2.6.0开始引入批量索引确认机制。启用批量索引确认机制后,消费者会过滤出已经确认过的批量索引,并将批量索引确认请求发送给代理(broker)。代理(broker)会维护和跟踪每个批量索引的确认状态,并避免向消费者(consumer)发送已确认过的消息。当批量中的所有消息都被确认后,该批量将会被删除。
默认情况下,禁用批量索引确认机制(AcknowledgementAtBatchIndexLevelEnabled=false)。但是可以通过在代理端(broker)将AcknowledgementAtBatchIndexLevelEnabled参数设置为true来启用批量索引确认机制。启用批量索引确认机制会导致更多内存开销。
5.消息分块
消息分块,使Pulsa的生产者和消费者都能处理大型有效负载消息(在生产者端将消息分块,在消费端将消息聚合)。
启用消息分块后,当消息(messages)大小超过允许的最大负载大小(broker的maxMessageSize参数)时,消息的工作流如下所示:
- 生产者(producer)将原始消息拆分为分块消息,并将它们与分块元数据一起按顺序单独发布到代理。
- 代理(broker)以与普通消息相同的方式将分块消息存储在一个managed-ledger(托管账本)中,并使用chunkedMessageRate参数记录该主题上分块消息的速率。
- 消费者(consumer)会缓存分块的消息,并在接收到消息的所有分块时将其聚合到接收队列中。
- 客户端(client)消费接收队列中的聚合消息。
消息分块的限制:
- 只能用于持久化主题
- 只能用于独占(exclusive)访问和容错订阅(failover subscription)类型。
- 不能和批处理同时使用
处理连续的分块消息
下图展示了处理连续分块消息的过程。图中,生产者依次往一个主题中发布分块消息(大型消息)和非分块消息(常规消息M3\M4)。生产者(producer)在发布M1消息时,把M1分成了M1-C1、M1-C2和M1-C3三个块消息。broker 端会存储所有的分块消息(放在managed-ledger中),并把他们按照相同的顺序发送给消费者。消费者(consumer)会在内存中缓存接收到的分块消息,直到全部接收,然后把它们聚合为原始的消息M1,最后将原始的消息M1移交给客户端(client)。

处理不连续的分块消息
当多个生产者将分块消息发布到单个主题中时,代理(broker)将来自不同生产者的所有分块消息都存储在同一个托管账本(managed-ledger)中。托管账本中的分块消息相互交错。如下所示,生产者1将消息M1分为三个数据块M1-C1、M1-C2和M1-C3进行发布。生产者2将消息M2也分为三个块M2-C1、M2-C2和M2-C3进行发布。但是特定消息的分块消息仍处于有序状态,虽然他们在托管分类中可能不是连续的。

注意事项
在这种情况下,交错的分块消息可能会给消费者带来一些内存压力,因为消费者为每个大型消息都保留了单独的缓冲区,以便将其所有的分块消息聚合成一条消息。通过配置maxPendingChunkedMessage参数,可以限制消费者并发维护的最大分块消息数量。当维护量达到数量阈值时,消费者会暂时丢弃这些消息,随后不发送消息确认,或者要求代理重传来进行消息补偿,从而优化内存利用率。
启用消息分块
前提条件:通过将enableBatching参数设置为false来禁用批量处理。
默认情况下,消息分块功能处于关闭状态。要启用消息分块,请在创建生产者时将chunkingEnabled参数设置为true。
注意事项
如果消费者未能在指定的时间段内收到消息的所有分块,那么不完整的分块消息将过期。过期的时间默认值为1分钟。有关ExpireTimeofCompletechUnkedMessage参数的更多信息,请参阅org。
边栏推荐
- How camunda uses script script nodes
- (幼升小信息-03)批量模板制作 幼儿基本信息收集文件夹(包含PDF、Word、证件文件夹)
- Socket programming (medium)
- Wallys/Network_ Card/DR-NAS26/AR9223/2x2 MIMO
- Based on STM32F103 - DS1302 date time + serial port printing
- [tcapulusdb knowledge base] Introduction to tcapulusdb tcapsvrmgr tool (I)
- [tcapulusdb knowledge base] Introduction to tcapulusdb tcapsvrmgr tool (II)
- VSCode 如何将已编辑好的文件中的 tab 键转换成空格键
- Envoyer un SMS - système de carte d'accès intelligent basé sur stm32f103 + as608 module d'empreintes digitales + clé matricielle 4x4 + sim900a
- [tcapulusdb knowledge base] Introduction to tcapulusdb tcapsvrmgr tool (III)
猜你喜欢

CPU的分支预测

About SAP Spartacus cmsservice Possible optimization ideas for getcomponentdata
![[tcapulusdb knowledge base] Introduction to tcapulusdb tcapsvrmgr tool (I)](/img/1b/92cbe7050580a0124a82f70dd3ca21.png)
[tcapulusdb knowledge base] Introduction to tcapulusdb tcapsvrmgr tool (I)

UE4,UE5虚幻引擎,Command Console控制台命令,参数集

文本纠错--CRASpell模型

基于STM32F103——AS608指纹模块+串口打印
![[tcapulusdb knowledge base] tcapulusdb doc acceptance - create business introduction](/img/a4/c3255ce17516348f703f7f21511555.png)
[tcapulusdb knowledge base] tcapulusdb doc acceptance - create business introduction

基于STM32F103——DS1302日期时间+串口打印

How camunda uses script script nodes

How to use dataX to update the data in the downstream Oracle database with the update semantics?
随机推荐
2022年二建《公路》科目答案已出,请收好
【TcaplusDB知识库】TcaplusDB分析型文本导出介绍
break algorithm---dynamic planning(dp-func)
『忘了再学』Shell基础 — 30、sed命令的使用
Review guide for students
Web development project, web single page development
【TcaplusDB知识库】TcaplusDB-tcapsvrmgr工具介绍(二)
轻量级实时语义分割:ENet & ERFNet
Details of fitfi sports money making chain game system development mode
[SQL statement basics] - select (supplement to single table query sequence)
[tcapulusdb knowledge base] tcapulusdb doc acceptance - Introduction to creating game area
Process of manually encrypting and burning the mass production firmware of ESP equipment
(small information for children to children-03) batch template production of basic information collection folder for children (including PDF, word and certificate folder)
面试题 Mysql 数据库
Kubernetes deploying ActiveMQ
Show/exec and close/hide of QT form are not executed when calling the close destructor
【TcaplusDB知识库】TcaplusDB-tcapsvrmgr工具介绍(三)
Committed to R & D and manufacturing of ultra surface photonic chip products, Shanhe optoelectronics completed a round of pre-A financing of tens of millions of yuan
面试突击56:聚簇索引和非聚簇索引有什么区别?
[tcapulusdb knowledge base] Introduction to tcapulusdb analytical text export