当前位置:网站首页>Pulsar 生产者

Pulsar 生产者

2022-06-13 11:53:00 swadian2008

目录

1.消息发送方式

2.访问模式

3.消息压缩

4.批量消息

5.消息分块


生产者(producer)是一个附加主题(topic)并把消息(messages)发布到 Pulsar broker 的程序,Pulsar broker 会处理接收到的消息(messages)。

1.消息发送方式

生产者(Producers)发送消息(messages)到 brokers 可以是同步的(sync),也可以是异步的(async)

ModeDescription

Sync send

同步发送

The producer waits for an acknowledgement from the broker after sending every message. If the acknowledgment is not received, the producer treats the sending operation as a failure.

// 生产者发送的每一条消息都需要等待broker的确认,如果没有收到确认,生产者认为此次消息发送失败

Async send

异步发送

The producer puts a message in a blocking queue and returns immediately. The client library sends the message to the broker in the background. If the queue is full (you can configure the maximum size), the producer is blocked or fails immediately when calling the API, depending on arguments passed to the producer.

// 生产者将消息放入阻塞队列并立即返回。客户端在后台将消息发送给代理。如果队列已满(可以配置大小),则根据传递给生产者的参数,生产者会被阻止或在调用 API 时立即失败。

2.访问模式

对于生产者的主题,有以下不同类型的访问模式。

Access modeDescription

Shared

共享模式

Multiple producers can publish on a topic.
// 多个生产者可以发布一个主题
This is the default setting.

// 默认配置

Exclusive

独占

Only one producer can publish on a topic.
// 只有一个生产者可以发布一个主题
If there is already a producer connected, other producers trying to publish on this topic get errors immediately.
// 如果已经有生产者连接,其他生产者向该主题发布消息将立即报错
The “old” producer is evicted and a “new” producer is selected to be the next exclusive producer if the “old” producer experiences a network partition with the broker.

// 当一个生产者因为网络中断连接,broker 会选择一个新的 producer 成为专用生产者

ExclusiveWithFencing

独占屏障

Only one producer can publish on a topic.
// 只有一个生产者可以发布一个主题
If there is already a producer connected, it will be removed and invalidated immediately.

// 如果已经有生产者连接,其他生产者将被移除并且立即失效

WaitForExclusive

等待独占

If there is already a producer connected, the producer creation is pending (rather than timing out) until the producer gets the Exclusive access.
// 如果已经连接了生产者,其他生产者连接将被挂起(而不是超时),直到获得独占访问权。
The producer that succeeds in becoming the exclusive one is treated as the leader. Consequently, if you want to implement a leader election scheme for your application, you can use this access mode. Note that the leader pattern scheme mentioned refers to using Pulsar as a Write-Ahead Log (WAL) which means the leader writes its "decisions" to the topic. On error cases, the leader will get notified it is no longer the leader only when it tries to write a message and fails on appropriate error, by the broker.

// 成功获取独占访问权的生产者将被视为一个leader,因此,如果你想为应用程序实现一个leader选举机制,可以选用该访问模式。需要注意的是,leader机制涉及到Pulsar的WAL日志,leader会把”决策“写入到主题。错误情况,当leader写入消息失败,会得到broker的通知,通知该生产者将不再是一个leader。

注意事项

一旦应用程序成功创建了ExclusiveWaitForExclusive访问模式的生产者,此应用程序的实例需要保证是主题的唯一编写器。任何其他试图生成有关此主题的消息的生产者都会立即出错或者必须等待,直到他们获得独占的访问权。

3.消息压缩

可以压缩生产者在传输过程中发布的消息。Pulsar目前支持以下类型的压缩:

  • LZ4:LZ算法系列的一种,号称是目前最快的压缩算法之一
  • ZLIB:zlib是用于数据压缩的一个简单的库,仅支持LZ77的变种算法
  • ZSTD:Facebook开源的新无损压缩算法,优点是压缩率和压缩/解压缩性能都很突出
  • SNAPPY:提供高速压缩速度和合理的压缩率。Snappy 比 zlib 更快,但文件相对要大 20% 到 100%。

压缩的通用原理:假如当前位置的一个字符串序列,在以前的历史数据中也出现过,那么现在用一种特殊的格式或者特殊的小序列来表示它,就可以起到压缩的效果,因为特殊格式或者特殊小序列通常都比原本的字符串序列更小。

4.批量消息

启用批处理后,生产者(producer)在单个请求中累积并发送一批消息(messages)。批量大小由最大消息数和最大发布延迟决定。因此,backlog大小表示批量的大小,而不是消息的大小。

在Pulsar中,批量消息作为单个单元而不是单个消息进行跟踪和存储。消费者需要将批量消息拆分为单个的消息。但是,即使启用了批处理,scheduled messages(通过 deliverAt 或 deliverAfter 参数配置)也始终作为单独的消息发送。

通常,当批量中的所有消息都被消费者确认时,该批量也会被确认。但是,当批量中有消息没有被确认,或者出现意外的失败,否定的确认以及确认超时会导致批量中的所有消息都重新的传递。

为了避免将批量中已确认的消息重新发送给消费者,Pulsar从Pulsar 2.6.0开始引入批量索引确认机制。启用批量索引确认机制后,消费者会过滤出已经确认过的批量索引,并将批量索引确认请求发送给代理(broker)。代理(broker)会维护和跟踪每个批量索引的确认状态,并避免向消费者(consumer)发送已确认过的消息。当批量中的所有消息都被确认后,该批量将会被删除。

默认情况下,禁用批量索引确认机制(AcknowledgementAtBatchIndexLevelEnabled=false)。但是可以通过在代理端(broker)将AcknowledgementAtBatchIndexLevelEnabled参数设置为true来启用批量索引确认机制。启用批量索引确认机制会导致更多内存开销

5.消息分块

消息分块,使Pulsa的生产者和消费者都能处理大型有效负载消息(在生产者端将消息分块,在消费端将消息聚合)。

启用消息分块后,当消息(messages)大小超过允许的最大负载大小(broker的maxMessageSize参数)时,消息的工作流如下所示:

  1. 生产者(producer)将原始消息拆分为分块消息,并将它们与分块元数据一起按顺序单独发布到代理。
  2. 代理(broker)以与普通消息相同的方式将分块消息存储在一个managed-ledger(托管账本)中,并使用chunkedMessageRate参数记录该主题上分块消息的速率。
  3. 消费者(consumer)会缓存分块的消息,并在接收到消息的所有分块时将其聚合到接收队列中。
  4. 客户端(client)消费接收队列中的聚合消息。

消息分块的限制:

  1. 只能用于持久化主题
  2. 只能用于独占(exclusive)访问和容错订阅(failover subscription)类型。
  3. 不能和批处理同时使用

处理连续的分块消息

下图展示了处理连续分块消息的过程。图中,生产者依次往一个主题中发布分块消息(大型消息)和非分块消息(常规消息M3\M4)。生产者(producer)在发布M1消息时,把M1分成了M1-C1、M1-C2和M1-C3三个块消息。broker 端会存储所有的分块消息(放在managed-ledger中),并把他们按照相同的顺序发送给消费者。消费者(consumer)会在内存中缓存接收到的分块消息,直到全部接收,然后把它们聚合为原始的消息M1,最后将原始的消息M1移交给客户端(client)。

处理不连续的分块消息

当多个生产者将分块消息发布到单个主题中时,代理(broker)将来自不同生产者的所有分块消息都存储在同一个托管账本(managed-ledger)中。托管账本中的分块消息相互交错。如下所示,生产者1将消息M1分为三个数据块M1-C1、M1-C2和M1-C3进行发布。生产者2将消息M2也分为三个块M2-C1、M2-C2和M2-C3进行发布。但是特定消息的分块消息仍处于有序状态,虽然他们在托管分类中可能不是连续的。

注意事项

在这种情况下,交错的分块消息可能会给消费者带来一些内存压力,因为消费者为每个大型消息都保留了单独的缓冲区,以便将其所有的分块消息聚合成一条消息。通过配置maxPendingChunkedMessage参数,可以限制消费者并发维护的最大分块消息数量。当维护量达到数量阈值时,消费者会暂时丢弃这些消息,随后不发送消息确认,或者要求代理重传来进行消息补偿,从而优化内存利用率。

启用消息分块

前提条件:通过将enableBatching参数设置为false来禁用批量处理

默认情况下,消息分块功能处于关闭状态。要启用消息分块,请在创建生产者时将chunkingEnabled参数设置为true。

注意事项

如果消费者未能在指定的时间段内收到消息的所有分块,那么不完整的分块消息将过期。过期的时间默认值为1分钟。有关ExpireTimeofCompletechUnkedMessage参数的更多信息,请参阅org。

原网站

版权声明
本文为[swadian2008]所创,转载请带上原文链接,感谢
https://blog.csdn.net/swadian2008/article/details/125217818