当前位置:网站首页>Pulsar 消费者
Pulsar 消费者
2022-06-13 11:53:00 【swadian2008】
目录
消费者(consumer)是通过订阅消息主题,然后接收消息的程序。
消费者向代理发送许可请求获取消息。消费者端有一个队列,用于接收从代理推送的消息。可以使用receiverQueueSize参数配置队列的大小。(默认大小为1000)。每次消费消息,调用receive(),则消息会从缓存队列中移除。
1.消息接收方式
从代理(brokers)接收消息可以是同步的也可以是异步的
| Mode | Description |
|---|---|
Sync receive // 同步 | A sync receive is blocked until a message is available. |
Async receive // 异步 | An async receive returns immediately with a future value—for example, a CompletableFuture in Java—that completes once a new message is available. // 异步接收立即返回一个future值,例如,Java中的CompletableFuture,一旦有新消息可用,它就会完成。 |
2.监听器
客户端为消费者提供侦听器的实现。例如,Java客户端提供 MessageListener 接口。在此接口中,每当接收到新消息时,都会调用received()方法。
3.消息确认
消费者在成功消费消息后会向代理(broker)发送确认请求。消息(message )将被永久存储,只有在所有订阅者都确认后才会被删除。如果要存储消费者已消费的消息,则需要配置消息的保留策略。
对于批处理消息,可以启用批索引确认机制,以避免将已确认的消息发送给消费者。有关批索引确认的详细信息,请参阅批处理。
可以通过以下两种方式之一确认消息:
- 单独确认。消费者每消费一条消息,都会向代理发送确认请求。
- 累计确认。消费者只确认其收到的最后一条消息。确认后,Stream流中所有的消息都不会重新传递给该消费者。
单独确认消息,可以使用下边API
consumer.acknowledge(msg);累计确认消息,可以使用下边API
consumer.acknowledgeCumulative(msg);注意事项
累积确认不能用于共享订阅类型( Shared subscription type),因为共享订阅类型涉及多个可以访问同一订阅的消费者。在共享订阅类型中,消息是单独确认的。
4.消息的否定确认
否定确认机制允许消费者向代理发送通知,指示消费者未处理消息。当消费者未能处理消息并需要重新消费它时,消费者会向代理(broker)发送否定确认(nack),触发代理将此消息重新传递给消费者。
消息可以单独的或累积的进行否定确认,这取决于消费的订阅模式。
- 在Exclusive(独占)和Failover subscription(容错订阅)模式中,消费者只会对他们收到的最后一条消息进行否定确认。
- 在Shared和Key_Shared订阅模式中,消费者可以单独的对每一条消息进行否定确认。
需要注意的是,对排序订阅模式(如Exclusive、Failover和Key_Shared)的否定确认可能会导致重新发送的消息超出它原来的顺序。
如果要对消息使用否定确认,请确保在确认超时之前对其进行否定确认。
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-negative-ack")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.negativeAckRedeliveryDelay(2, TimeUnit.SECONDS) // the default value is 1 min
.subscribe();
Message<byte[]> message = consumer.receive(); // 消费消息
// call the API to send negative acknowledgement,在超时之前进行否定确认
consumer.negativeAcknowledge(message); // 否定确认
message = consumer.receive();
consumer.acknowledge(message);
可以通过设置传递消息的重试次数来设置消息重传的不同时延。
Use the following API to enable Negative Redelivery Backoff.
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-negative-ack")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
.minDelayMs(1000)
.maxDelayMs(60 * 1000)
.build())
.subscribe();消息重传的时延如下所示。
| Redelivery count | Redelivery delay |
|---|---|
| 1 | 10 + 1 seconds // 11s |
| 2 | 10 + 2 seconds // 12s |
| 3 | 10 + 4 seconds |
| 4 | 10 + 8 seconds |
| 5 | 10 + 16 seconds |
| 6 | 10 + 32 seconds |
| 7 | 10 + 60 seconds |
| 8 | 10 + 60 seconds |
注意事项
如果启用了批处理,则同一批量中的所有消息都会重新传递给消费者。
5.消息的确认超时
确认超时机制可以为客户端跟踪未确认消息设置一个时间范围。如果客户端在超时(ackTimeout)期间没有发送确认消息,那么超时后,客户端会向代理(broker)发送否定确认请求,触发broker重传。
如果规定时间内无应答,或者需要执行规定时间内检查超时确认消息的定时任务,都可以通过配置确认超时机制来触发消息重传。
确认超时机制可以通过设置重试次数来实现重传消息的不同延时。
如果要使用重传机制,可以使用以下API。
consumer.ackTimeout(10, TimeUnit.SECOND)
.ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder() // 启用消息重传
.minDelayMs(1000)
.maxDelayMs(60000)
.multiplier(2).build())消息重传的时延如下所示
| Redelivery count | Redelivery delay |
|---|---|
| 1 | 10 + 1 seconds |
| 2 | 10 + 2 seconds |
| 3 | 10 + 4 seconds |
| 4 | 10 + 8 seconds |
| 5 | 10 + 16 seconds |
| 6 | 10 + 32 seconds |
| 7 | 10 + 60 seconds |
| 8 | 10 + 60 seconds |
注意事项
与确认超时相比,优先选择确认否定。首先,超时时间很难确定,其次,当消息处理时间超过确认时间时,代理会重新发送消息,但这些消息可能并不需要重新消费。
Use the following API to enable acknowledgement timeout(确认超时).
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.ackTimeout(2, TimeUnit.SECONDS) // the default value is 0
.ackTimeoutTickTime(1, TimeUnit.SECONDS)
.subscriptionName("sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Message<byte[]> message = consumer.receive();
// wait at least 2 seconds
message = consumer.receive();
consumer.acknowledge(message);
6.重试信件主题( retry letter)
“retry letter”主题,会存储消费失败的消息,并在之后尝试重试消费。通过这个方式,可以自定义消息重试的时间间隔。订阅原始主题的消费者也会自动订阅“retry letter”主题。一旦“retry letter”主题中的消息达到最大重试次数,该消息将移动至死信主题,死信主题需要进行手动处理。
下图说明了重试主题的概念

使用“retry letter”主题的目的与使用延迟消息传递的目的不同,尽管两者都延后消费消息。“retry letter”主题在消息消费失败时,通过消息重传机制,确保关键数据不会丢失,而消息延迟传递是为了在指定的延迟时间传递消息。
默认情况下,禁用自动重试。可以将enableRetry设置为true,在消费者上启用自动重试。
Use the following API to consume messages from a retry letter topic. When the value of maxRedeliverCount is reached, the unconsumed messages are moved to a dead letter topic.
当达到最大重试次数(maxRedeliverCount),消息会被移动到死信主题
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true) // 开启自动重试
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.build())
.subscribe();
“retry letter”主题默认使用以下格式:
<topicname>-<subscriptionname>-RETRY使用 Java client 明确规定“retry letter”主题名称
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.retryLetterTopic("my-retry-letter-topic-name") //重试主题
.build())
.subscribe();
“retry letter”主题中的消息包含一些由客户端自动创建的特殊属性。
| Special property | Description |
|---|---|
| The real topic name. |
| The origin message ID. It is crucial for message tracking. // 对于消息跟踪非常关键 |
| The number of retries to consume messages. |
| Message retry interval in milliseconds. // 消息重试间隔(毫秒) |
Example
REAL_TOPIC = persistent://public/default/my-topic
ORIGIN_MESSAGE_ID = 1:0:-1:0
RECONSUMETIMES = 6
DELAY_TIME = 3000
使用以下API将消息存储在重试队列中。
consumer.reconsumeLater(msg, 3, TimeUnit.SECONDS);使用以下API为Reconsumerater函数添加自定义属性。在下一次尝试消费时,可以通过message#getProperty获取自定义属性。
Map<String, String> customProperties = new HashMap<String, String>();
customProperties.put("custom-key-1", "custom-value-1");
customProperties.put("custom-key-2", "custom-value-2");
consumer.reconsumeLater(msg, customProperties, 3, TimeUnit.SECONDS);注意事项
当前,在共享订阅类型中启用了“retry letter”主题。
与否定确认相比,“retry letter”主题更适合配置了重试时间间隔且需要进行大量消息重试的场景。因为“retry letter”主题中的消息持久化在BookKeeper中(broker中),而否定确认后需要重试的消息缓存在客户端。
7.死信主题
死信主题允许继续使用消费未成功的消息。无法使用的消息存储在特定的主题中,该主题称为死信主题(dead letter topic)。你可以决定如何处理死信主题中的消息。
在Java客户端中启用默认的死信主题。
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder() // 启用死信主题
.maxRedeliverCount(maxRedeliveryCount)
.build())
.subscribe();
死信主题的默认格式
<topicname>-<subscriptionname>-DLQ
使用 Java client 定义死信主题的名字
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.deadLetterTopic("my-dead-letter-topic-name") // 定义死信主题的名字
.build())
.subscribe();
By default, there is no subscription during a DLQ topic creation. Without a just-in-time subscription to the DLQ topic, you may lose messages.
默认情况下,DLQ主题创建期间没有被订阅。如果没有实时订阅DLQ主题,可能会丢失消息。
如果需要自动创建DLQ的初始订阅,可以指定 initialSubscriptionName 参数。如果设置了此参数,但代理的 allowAutoSubscriptionCreation 被禁用,则无法创建DLQ生产者。
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.deadLetterTopic("my-dead-letter-topic-name")
.initialSubscriptionName("init-sub") // 创建自动订阅
.build())
.subscribe();
DLQ主题可以进行消息重传,由配置的确认超时、否定确认、重试信件主题等条件触发。
注意事项
目前,已在 Shared 和 Key_Shared 订阅类型中启用死信主题。
边栏推荐
- Interview skills Q & A
- To avoid letting the transformation enterprises go astray, it is time to re understand the integration of xiahu warehouse| Q recommendation
- 面试题 Mysql 数据库
- 【TcaplusDB知识库】TcaplusDB-tcapsvrmgr工具介绍(一)
- flutter 插件 Pluto表格使用所遇问题
- Auto.js 悬浮窗居中
- Kubernetes deploying ActiveMQ
- 基于STM32F103——SIM900A发送短信+串口打印
- Camunda定时器事件示例Demo(Timer Events)
- C#/VB. Net to generate directory bookmarks when word is converted to PDF
猜你喜欢

秒云与趋动科技联合发布容器云平台与GPU资源池化整体解决方案
![[benefits] in minutes](/img/e0/217fe575c1a31b04a9b0699ec44f25.png)
[benefits] in minutes

What is the appropriate setting for the number of database connections?

Camunda timer events example demo (timer events)

Product story | YuQue drawing board you don't know

产品故事|你所不知道的语雀画板

多系统对接的适配与包装模式应用

Camunda定时器事件示例Demo(Timer Events)

2022年二建《市政》科目答案已出,请收好

LVGL库入门教程01-移植到STM32(触摸屏)
随机推荐
面试突击56:聚簇索引和非聚簇索引有什么区别?
TS advanced condition type
全网最全,含面试题+答案
Interview skills Q & A
【管理知多少】“风险登记册”本身的风险
【TcaplusDB知识库】TcaplusDB表数据缓写介绍
产品故事|你所不知道的语雀画板
致力超表面光子芯片产品研发与制造,山河光电完成数千万元Pre-A轮融资
基于三维GIS技术的行业发展及研究现状
TS进阶之条件类型
Socket programming (medium)
UE4,UE5虚幻引擎,Command Console控制台命令,参数集
The leader said he would go online tomorrow, but he didn't know the development process at all
Lightweight real-time semantic segmentation: eNet & erfnet
[tcapulusdb knowledge base] Introduction to tcapulusdb table data caching
Wallys/Network_ Card/DR-NAS26/AR9223/2x2 MIMO
2022年二建《法规》科目答案已出,请收好
1051. 高度检查器
Web development video tutorial, web development teaching
14、wpf之Border装饰器使用小记