当前位置:网站首页>Pulsar 消费者

Pulsar 消费者

2022-06-13 11:53:00 swadian2008

目录

1.消息接收方式

2.监听器

3.消息确认

4.消息的否定确认

5.消息的确认超时

6.重试信件主题( retry letter)

7.死信主题​


消费者(consumer)是通过订阅消息主题,然后接收消息的程序。

消费者向代理发送许可请求获取消息。消费者端有一个队列,用于接收从代理推送的消息。可以使用receiverQueueSize参数配置队列的大小。(默认大小为1000)。每次消费消息,调用receive(),则消息会从缓存队列中移除。

1.消息接收方式

从代理(brokers)接收消息可以是同步的也可以是异步的

ModeDescription

Sync receive

// 同步

A sync receive is blocked until a message is available.

Async receive

// 异步

An async receive returns immediately with a future value—for example, a CompletableFuture in Java—that completes once a new message is available.

// 异步接收立即返回一个future值,例如,Java中的CompletableFuture,一旦有新消息可用,它就会完成。

2.监听器

客户端为消费者提供侦听器的实现。例如,Java客户端提供 MessageListener 接口。在此接口中,每当接收到新消息时,都会调用received()方法。

3.消息确认

消费者在成功消费消息后会向代理(broker)发送确认请求。消息(message )将被永久存储,只有在所有订阅者都确认后才会被删除。如果要存储消费者已消费的消息,则需要配置消息的保留策略。

对于批处理消息,可以启用批索引确认机制,以避免将已确认的消息发送给消费者。有关批索引确认的详细信息,请参阅批处理。

可以通过以下两种方式之一确认消息:

  1. 单独确认。消费者每消费一条消息,都会向代理发送确认请求。
  2. 累计确认。消费者只确认其收到的最后一条消息。确认后,Stream流中所有的消息都不会重新传递给该消费者。

单独确认消息,可以使用下边API

consumer.acknowledge(msg);

累计确认消息,可以使用下边API

consumer.acknowledgeCumulative(msg);

注意事项

累积确认不能用于共享订阅类型( Shared subscription type),因为共享订阅类型涉及多个可以访问同一订阅的消费者。在共享订阅类型中,消息是单独确认的。

4.消息的否定确认

否定确认机制允许消费者向代理发送通知,指示消费者未处理消息。当消费者未能处理消息并需要重新消费它时,消费者会向代理(broker)发送否定确认(nack),触发代理将此消息重新传递给消费者。

消息可以单独的或累积的进行否定确认,这取决于消费的订阅模式。

  • 在Exclusive(独占)和Failover subscription(容错订阅)模式中,消费者只会对他们收到的最后一条消息进行否定确认。
  • 在Shared和Key_Shared订阅模式中,消费者可以单独的对每一条消息进行否定确认。

需要注意的是,对排序订阅模式(如Exclusive、Failover和Key_Shared)的否定确认可能会导致重新发送的消息超出它原来的顺序。

如果要对消息使用否定确认,请确保在确认超时之前对其进行否定确认

Consumer<byte[]> consumer = pulsarClient.newConsumer()
                .topic(topic)
                .subscriptionName("sub-negative-ack")
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .negativeAckRedeliveryDelay(2, TimeUnit.SECONDS) // the default value is 1 min
                .subscribe();

Message<byte[]> message = consumer.receive(); // 消费消息

// call the API to send negative acknowledgement,在超时之前进行否定确认
consumer.negativeAcknowledge(message); // 否定确认

message = consumer.receive();
consumer.acknowledge(message);

可以通过设置传递消息的重试次数来设置消息重传的不同时延

Use the following API to enable Negative Redelivery Backoff.

Consumer<byte[]> consumer = pulsarClient.newConsumer()
        .topic(topic)
        .subscriptionName("sub-negative-ack")
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
            .minDelayMs(1000)
            .maxDelayMs(60 * 1000)
            .build())
        .subscribe();

消息重传的时延如下所示。

Redelivery countRedelivery delay
110 + 1 seconds // 11s
210 + 2 seconds // 12s
310 + 4 seconds
410 + 8 seconds
510 + 16 seconds
610 + 32 seconds
710 + 60 seconds
810 + 60 seconds

注意事项

如果启用了批处理,则同一批量中的所有消息都会重新传递给消费者。

5.消息的确认超时

确认超时机制可以为客户端跟踪未确认消息设置一个时间范围。如果客户端在超时(ackTimeout)期间没有发送确认消息,那么超时后,客户端会向代理(broker)发送否定确认请求,触发broker重传。

如果规定时间内无应答,或者需要执行规定时间内检查超时确认消息的定时任务,都可以通过配置确认超时机制来触发消息重传

确认超时机制可以通过设置重试次数来实现重传消息的不同延时。

如果要使用重传机制,可以使用以下API。

consumer.ackTimeout(10, TimeUnit.SECOND)
        .ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder() // 启用消息重传
        .minDelayMs(1000)
        .maxDelayMs(60000)
        .multiplier(2).build())

消息重传的时延如下所示

Redelivery countRedelivery delay
110 + 1 seconds
210 + 2 seconds
310 + 4 seconds
410 + 8 seconds
510 + 16 seconds
610 + 32 seconds
710 + 60 seconds
810 + 60 seconds

注意事项

与确认超时相比,优先选择确认否定。首先,超时时间很难确定,其次,当消息处理时间超过确认时间时,代理会重新发送消息,但这些消息可能并不需要重新消费。

Use the following API to enable acknowledgement timeout(确认超时).

Consumer<byte[]> consumer = pulsarClient.newConsumer()
                .topic(topic)
                .ackTimeout(2, TimeUnit.SECONDS) // the default value is 0
                .ackTimeoutTickTime(1, TimeUnit.SECONDS)
                .subscriptionName("sub")
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();

Message<byte[]> message = consumer.receive();

// wait at least 2 seconds
message = consumer.receive();
consumer.acknowledge(message);

6.重试信件主题( retry letter)

“retry letter”主题,会存储消费失败的消息,并在之后尝试重试消费。通过这个方式,可以自定义消息重试的时间间隔。订阅原始主题的消费者也会自动订阅“retry letter”主题。一旦“retry letter”主题中的消息达到最大重试次数,该消息将移动至死信主题,死信主题需要进行手动处理。

下图说明了重试主题的概念

使用“retry letter”主题的目的与使用延迟消息传递的目的不同,尽管两者都延后消费消息。“retry letter”主题在消息消费失败时,通过消息重传机制,确保关键数据不会丢失,而消息延迟传递是为了在指定的延迟时间传递消息。

默认情况下,禁用自动重试。可以将enableRetry设置为true,在消费者上启用自动重试。

Use the following API to consume messages from a retry letter topic. When the value of maxRedeliverCount is reached, the unconsumed messages are moved to a dead letter topic.

当达到最大重试次数(maxRedeliverCount),消息会被移动到死信主题

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                .topic("my-topic")
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .enableRetry(true) // 开启自动重试
                .deadLetterPolicy(DeadLetterPolicy.builder()
                        .maxRedeliverCount(maxRedeliveryCount)
                        .build())
                .subscribe();

“retry letter”主题默认使用以下格式:

<topicname>-<subscriptionname>-RETRY

使用 Java client 明确规定“retry letter”主题名称

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .subscriptionType(SubscriptionType.Shared)
        .enableRetry(true)
        .deadLetterPolicy(DeadLetterPolicy.builder()
                .maxRedeliverCount(maxRedeliveryCount)
                .retryLetterTopic("my-retry-letter-topic-name") //重试主题
                .build())
        .subscribe();

“retry letter”主题中的消息包含一些由客户端自动创建的特殊属性

Special propertyDescription

REAL_TOPIC

//真正主题

The real topic name.

ORIGIN_MESSAGE_ID

//原始消息ID

The origin message ID. It is crucial for message tracking.

// 对于消息跟踪非常关键

RECONSUMETIMES

//重新消费次数

The number of retries to consume messages.

DELAY_TIME

//延迟时间

Message retry interval in milliseconds.

// 消息重试间隔(毫秒)

Example

REAL_TOPIC = persistent://public/default/my-topic
ORIGIN_MESSAGE_ID = 1:0:-1:0
RECONSUMETIMES = 6
DELAY_TIME = 3000

使用以下API将消息存储在重试队列中。

consumer.reconsumeLater(msg, 3, TimeUnit.SECONDS);

使用以下API为Reconsumerater函数添加自定义属性。在下一次尝试消费时,可以通过message#getProperty获取自定义属性。

Map<String, String> customProperties = new HashMap<String, String>();
customProperties.put("custom-key-1", "custom-value-1");
customProperties.put("custom-key-2", "custom-value-2");
consumer.reconsumeLater(msg, customProperties, 3, TimeUnit.SECONDS);

注意事项

当前,在共享订阅类型中启用了“retry letter”主题。

与否定确认相比,“retry letter”主题更适合配置了重试时间间隔且需要进行大量消息重试的场景。因为“retry letter”主题中的消息持久化在BookKeeper中(broker中),而否定确认后需要重试的消息缓存在客户端。

7.死信主题

死信主题允许继续使用消费未成功的消息。无法使用的消息存储在特定的主题中,该主题称为死信主题(dead letter topic)。你可以决定如何处理死信主题中的消息。

在Java客户端中启用默认的死信主题。

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                .topic("my-topic")
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .deadLetterPolicy(DeadLetterPolicy.builder() // 启用死信主题
                      .maxRedeliverCount(maxRedeliveryCount)
                      .build())
                .subscribe();

死信主题的默认格式

<topicname>-<subscriptionname>-DLQ

使用 Java client 定义死信主题的名字

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                .topic("my-topic")
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .deadLetterPolicy(DeadLetterPolicy.builder()
                      .maxRedeliverCount(maxRedeliveryCount)
                      .deadLetterTopic("my-dead-letter-topic-name") // 定义死信主题的名字
                      .build())
                .subscribe();

By default, there is no subscription during a DLQ topic creation. Without a just-in-time subscription to the DLQ topic, you may lose messages.

默认情况下,DLQ主题创建期间没有被订阅。如果没有实时订阅DLQ主题,可能会丢失消息。

如果需要自动创建DLQ的初始订阅,可以指定 initialSubscriptionName 参数。如果设置了此参数,但代理的 allowAutoSubscriptionCreation 被禁用,则无法创建DLQ生产者。

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                .topic("my-topic")
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .deadLetterPolicy(DeadLetterPolicy.builder()
                      .maxRedeliverCount(maxRedeliveryCount)
                      .deadLetterTopic("my-dead-letter-topic-name")
                      .initialSubscriptionName("init-sub") // 创建自动订阅
                      .build())
                .subscribe();

DLQ主题可以进行消息重传,由配置的确认超时、否定确认、重试信件主题等条件触发。

注意事项

目前,已在 Shared 和 Key_Shared 订阅类型中启用死信主题。

原网站

版权声明
本文为[swadian2008]所创,转载请带上原文链接,感谢
https://blog.csdn.net/swadian2008/article/details/125221946