一旦创建订阅,即使 consumer 断开连接,Pulsar 仍然可以保存所有消息。 在 consumer 确认消息已处理成功后,才会删除消息。

消息是 Pulsar 的基础“单元”。 The following table lists the components of messages.

Producers

A producer is a process that attaches to a topic and publishes messages to a Pulsar broker. The Pulsar broker process the messages.

Producer 可以以同步(sync) 或 异步(async) 的方式发布消息到 broker。

发送模式Description
同步发送The producer waits for an acknowledgement from the broker after sending every message. If the acknowledgment is not received, the producer treats the sending operation as a failure.
异步发送Producer 将把消息放于阻塞队列中,并立即返回 然后,客户端将在后台将消息发送给 broker。 如果队列已满(),则调用 API 时,producer 可能会立即被阻止或失败,具体取决于传递给 producer 的参数。

压缩

You can compress messages published by producers during transportation. Pulsar currently supports the following types of compression:

批量处理

当批量处理启用时,producer 会在单个请求中积累并发送一批消息。 批量处理的量大小由最大消息数和最大发布延迟定义。 因此,积压数量是分批处理的总数,而不是信息总数。

在 Pulsar 中,批次被跟踪并存储为单个单元,而不是单个消息。 Consumer 将批量处理的消息拆分成单个消息。 但即使启用了批量处理,也始终将计划中的消息(通过 deliverAt 或者 deliverAfter 进行配置) 作为单个消息发送。

一般来说,当 consumer 确认了一个批的所有消息,该批才会被认定为确认。 这意味着当发生不可预料的失败、否定的确认(negative acknowledgements)或确认超时,都可能导致批中的所有消息都被重新发送,即使其中一些消息已经被确认了。

To avoid redelivering acknowledged messages in a batch to the consumer, Pulsar introduces batch index acknowledgement since Pulsar 2.6.0. When batch index acknowledgement is enabled, the consumer filters out the batch index that has been acknowledged and sends the batch index acknowledgement request to the broker. Broker 维护批量索引的确认状态并跟踪每批索引的确认状态,以避免向 consumer 发送已确认的消息。 当某一批消息的所有索引都被确认时,该批消息将被删除。

默认情况下,批量索引确认是禁用的( batchIndexAcknowledgeEnable=false)。 你可以通过在 broker 中将 batchIndexAcknowledgeEnable 设置为 true 来启用批量索引确认。 启用批量索引确认将会导致更多内存开销。

分块

当你想要启用分块(chunking) 时,请阅读以下说明。

  • Batching and chunking cannot be enabled simultaneously. 如果想要启用分块(chunking) ,您必须提前禁用批量处理。
  • Chunking is only supported for persisted topics.
  • Chunking is only supported for the exclusive and failover subscription modes.

当启用分块(chunking) 时(chunkingEnabled=true) ,如果消息大小大于允许的最大发布有效载荷大小,则 producer 将原始消息分割成分块的消息,并将它们与块状的元数据一起单独和按顺序发布到 broker。 在 broker 中,分块的消息将和普通的消息以相同的方式存储在 Managed Ledger 上。 唯一的区别是,consumer 需要缓冲分块消息,并在收集完所有分块消息后将其合并成真正的消息。 Managed Ledger 上的分块消息可以和普通消息交织在一起。 如果 producer 未能发布消息的所有分块,则当 consumer 未能在过期时间(expire time) 内接收所有分块时,consumer 可以过期未完成的分块。 默认情况下,过期时间设置为1小时。

Consumer 会缓存收到的块状消息,直到收到消息的所有分块为止。 然后 consumer 将分块的消息拼接在一起,并将它们放入接收器队列中。 客户端从接收器队列中消费消息。 一旦 consumer 使用整个大消息并确认,consumer 就会在内部发送与该大消息关联的所有分块消息的确认。 你可以通过在 consumer 中设置 maxPendingChuckedMessage。 当达到阈值时,consumer 通过静默确认未分块的消息或通过将其标记为未确认,要求 broker 稍后重新发送这些消息。

The broker does not require any changes to support chunking for non-shared subscription. The broker only uses chuckedMessageRate to record chunked message rate on the topic.

处理一个 producer 和一个订购的 consumer 的分块消息

As shown in the following figure, when a topic has one producer which publishes large message payload in chunked messages along with regular non-chunked messages. The producer publishes message M1 in three chunks M1-C1, M1-C2 and M1-C3. The broker stores all the three chunked messages in the managed-ledger and dispatches to the ordered (exclusive/failover) consumer in the same order. The consumer buffers all the chunked messages in memory until it receives all the chunked messages, combines them into one message and then hands over the original message M1 to the client.

Handle chunked messages with multiple producers and one ordered consumer

When multiple publishers publish chunked messages into a single topic, the broker stores all the chunked messages coming from different publishers in the same managed-ledger. As shown below, Producer 1 publishes message M1 in three chunks M1-C1, M1-C2 and M1-C3. Producer 2 publishes message M2 in three chunks M2-C1, M2-C2 and M2-C3. All chunked messages of the specific message are still in order but might not be consecutive in the managed-ledger. This brings some memory pressure to the consumer because the consumer keeps separate buffer for each large message to aggregate all chunks of the large message and combine them into one message.

Messaging - 图2

Consumers

A consumer is a process that attaches to a topic via a subscription and then receives messages.

Consumer 向 broker 发送消息流获取申请(flow permit request)以获取消息。 在 Consumer 端有一个队列,用于接收从 broker 推送来的消息。 You can configure the queue size with the parameter. The default size is 1000). 每当 consumer.receive() 被调用一次,就从缓冲区(buffer)获取一条消息。

接收模式

Messages are received from either synchronously (sync) or asynchronously (async).

监听

Client libraries provide listener implementation for consumers. For example, the provides a MesssageListener interface. 在这个接口中,一旦接受到新的消息,received方法将被调用。

确认

When a consumer consumes a message successfully, the consumer sends an acknowledgement request to the broker. This message is permanently stored, and then deleted only after all the subscriptions have acknowledged it. 如果希望消息被 Consumer 确认后仍然保留下来,可配置 消息保留策略实现。

For a batch message, if batch index acknowledgement is enabled, the broker maintains the batch index acknowledgement status and tracks the acknowledgement status of each batch index to avoid dispatching acknowledged messages to the consumer. 当某一批消息的所有索引都被确认时,该批消息将被删除。 For details about the batch index acknowledgement, see .

Messages is acknowledged either one by one or cumulatively. 累积确认时,消费者只需要确认最后一条他收到的消息。 所有之前(包含此条)的消息,都不会被再次重发给那个消费者。

Messages can be acknowledged in the following two ways:

  • Messages are acknowledged individually. With individual acknowledgement, the consumer needs to acknowledge each message and sends an acknowledgement request to the broker.
  • Messages are acknowledged cumulatively. 累积确认时,消费者只需要确认最后一条他收到的消息。 All messages in the stream up to (and including) the provided message are not re-delivered to that consumer.

Note

Cumulative acknowledgement cannot be used in the shared subscription mode, because the shared subscription mode involves multiple consumers who have access to the same subscription. In the shared subscription mode, messages are acknowledged individually.

取消确认

When a consumer does not consume a message successfully at a time, and wants to consume the message again, the consumer sends a negative acknowledgement to the broker, and then the broker redelivers the message.

Messages are negatively acknowledged one by one or cumulatively, which depends on the consumption subscription mode.

In the exclusive and failover subscription modes, consumers only negatively acknowledge the last message they receive.

In the shared and Key_Shared subscription modes, you can negatively acknowledge messages individually.

Note If batching is enabled, other messages and the negatively acknowledged messages in the same batch are redelivered to the consumer.

If a message is not consumed successfully, and you want to trigger the broker to redeliver the message automatically, you can adopt the unacknowledged message automatic re-delivery mechanism. Client tracks the unacknowledged messages within the entire acktimeout time range, and sends a redeliver unacknowledged messages request to the broker automatically when the acknowledgement timeout is specified.

Note If batching is enabled, other messages and the unacknowledged messages in the same batch are redelivered to the consumer.

Note
Prefer negative acknowledgements over acknowledgement timeout. 确认取消是以更高的精度在控制单条消息的重新传递。当消息处理时间超过确认超时时间时,要避免无效的消息重传。

死信主题

Dead letter topic enables you to consume new messages when some messages cannot be consumed successfully by a consumer. In this mechanism, messages that are failed to be consumed are stored in a separate topic, which is called dead letter topic. You can decide how to handle messages in the dead letter topic.

The following example shows how to enable dead letter topic in a Java client using the default dead letter topic:

The default dead letter topic uses this format:

  1. <topicname>-<subscriptionname>-DLQ
  1. Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
  2. .topic(topic)
  3. .subscriptionName("my-subscription")
  4. .subscriptionType(SubscriptionType.Shared)
  5. .deadLetterPolicy(DeadLetterPolicy.builder()
  6. .maxRedeliverCount(maxRedeliveryCount)
  7. .deadLetterTopic("your-topic-name")
  8. .build())
  9. .subscribe();

Dead letter topic depends on message re-delivery. Messages are redelivered either due to or negative acknowledgement. If you are going to use negative acknowledgement on a message, make sure it is negatively acknowledged before the acknowledgement timeout.

Note
Currently, dead letter topic is enabled only in the shared subscription mode.

Retry letter topic

For many online business systems, a message is re-consumed due to exception occurs in the business logic processing. To configure the delay time for re-consuming the failed messages, you can configure the producer to send messages to both the business topic and the retry letter topic, and enable automatic retry on the consumer. When automatic retry is enabled on the consumer, a message is stored in the retry letter topic if the messages are not consumed, and therefore the consumer automatically consumes the failed messages from the retry letter topic after a specified delay time.

By default, automatic retry is disabled. You can set enableRetry to true to enable automatic retry on the consumer.

This example shows how to consume messages from a retry letter topic.

Topic

As in other pub-sub systems, topics in Pulsar are named channels for transmitting messages from producers to consumers. Topic的名称为符合良好结构的URL:

  1. {persistent|non-persistent}://tenant/namespace/topic
Topic名称组成Description
persistent / non-persistent用来标识 topic 的类型。 Pulsar supports two kind of topics: and non-persistent. The default is persistent, so if you do not specify a type, the topic is persistent. With persistent topics, all messages are durably persisted on disks (if the broker is not standalone, messages are durably persisted on multiple disks), whereas data for non-persistent topics is not persisted to storage disks.
The topic tenant within the instance. Tenants are essential to multi-tenancy in Pulsar, and spread across clusters.
命名空间将相关联的 topic 作为一个组来管理,是管理 Topic 的基本单元。 大多数对 topic 的管理都是对的一项配置。 Each tenant has one or multiple namespaces.
topicThe final part of the name. Topic names have no special meaning in a Pulsar instance.

命名空间是租户内部逻辑上的命名术语。 A tenant creates multiple namespaces via the admin API. 例如,包含多个应用程序的租户可以为每个应用程序创建单独的命名空间。 Namespace使得程序可以以层级的方式创建和管理topic Topicmy-tenant/app1 ,它的namespace是app1这个应用,对应的租户是 my-tenant。 你可以在namespace下创建任意数量的。

Subscriptions

订阅是命名好的配置规则,指导消息如何投递给消费者。 Four subscription modes are available in Pulsar: , shared, , and key_shared. 下图展示了这三种模式:

Pub-Sub or Queuing

In Pulsar, you can use different subscriptions flexibly. * If you want to achieve traditional “fan-out pub-sub messaging” among consumers, specify a unique subscription name for each consumer. It is exclusive subscription mode. * If you want to achieve “message queuing” among consumers, share the same subscription name among multiple consumers(shared, failover, key_shared). * If you want to achieve both effects simultaneously, combine exclusive subscription mode with other subscription modes for consumers.

Exclusive

In exclusive mode, only a single consumer is allowed to attach to the subscription. If multiple consumers subscribe to a topic using the same subscription, an error occurs.

In the diagram below, only Consumer A-0 is allowed to consume messages.

Exclusive模式为默认订阅模式。

独占订阅

Failover(灾备)

In failover mode, multiple consumers can attach to the same subscription. A master consumer is picked for non-partitioned topic or each partition of partitioned topic and receives messages. When the master consumer disconnects, all (non-acknowledged and subsequent) messages are delivered to the next consumer in line.

For partitioned topics, broker will sort consumers by priority level and lexicographical order of consumer name. Then broker will try to evenly assigns topics to consumers with the highest priority level.

For non-partitioned topic, broker will pick consumer in the order they subscribe to the non partitioned topic.

In the diagram below, Consumer-B-0 is the master consumer while Consumer-B-1 would be the next consumer in line to receive messages if Consumer-B-0 is disconnected.

Shared(共享)

In shared or round robin mode, multiple consumers can attach to the same subscription. 消息通过round robin轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。 当消费者断开连接,所有被发送给他,但没有被确认的消息将被重新安排,分发给其它存活的消费者。

In the diagram below, Consumer-C-1 and Consumer-C-2 are able to subscribe to the topic, but Consumer-C-3 and others could as well.

Shared模式的限制

When using shared mode, be aware that: * Message ordering is not guaranteed. * You cannot use cumulative acknowledgment with shared mode.

共享订阅

Key_Shared

In Key_Shared mode, multiple consumers can attach to the same subscription. Messages are delivered in a distribution across consumers and message with same key or same ordering key are delivered to only one consumer. No matter how many times the message is re-delivered, it is delivered to the same consumer. When a consumer connected or disconnected will cause served consumer change for some key of message.

Limitations of Key_Shared mode

When you use Key_Shared mode, be aware that: * You need to specify a key or orderingKey for messages * You cannot use cumulative acknowledgment with Key_Shared mode.

可以在 broker.config 中禁用 Key_Shared 模式。

多主题订阅

当consumer订阅pulsar的主题时,它默认指定订阅了一个主题,例如:persistent://public/default/my-topic。 从Pulsar的1.23.0-incubating的版本开始,Pulsar消费者可以同时订阅多个topic。 你可以用以下两种方式定义topic的列表:

  • On the basis of a (regex), for example persistent://public/default/finance-.*
  • 通过明确指定的topic列表

When subscribing to multiple topics, the Pulsar client automatically makes a call to the Pulsar API to discover the topics that match the regex pattern/list, and then subscribe to all of them. If any of the topics do not exist, the consumer auto-subscribes to them once the topics are created.

No ordering guarantees across multiple topics

When a producer sends messages to a single topic, all messages are guaranteed to be read from that topic in the same order. However, these guarantees do not hold across multiple topics. So when a producer sends message to multiple topics, the order in which messages are read from those topics is not guaranteed to be the same.

The following are multi-topic subscription examples for Java.

  1. import java.util.regex.Pattern;
  2. import org.apache.pulsar.client.api.Consumer;
  3. PulsarClient pulsarClient = // Instantiate Pulsar client object
  4. // Subscribe to all topics in a namespace
  5. Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
  6. Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
  7. .topicsPattern(allTopicsInNamespace)
  8. .subscriptionName("subscription-1")
  9. .subscribe();
  10. // Subscribe to a subsets of topics in a namespace, based on regex
  11. Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
  12. Consumer<byte[]> someTopicsConsumer = pulsarClient.newConsumer()
  13. .topicsPattern(someTopicsInNamespace)
  14. .subscriptionName("subscription-1")

For code examples, see .

分区 topic

Normal topics are served only by a single broker, which limits the maximum throughput of the topic. Partitioned topics are a special type of topic that are handled by multiple brokers, thus allowing for higher throughput.

A partitioned topic is actually implemented as N internal topics, where N is the number of partitions. 当向分区的topic发送消息,每条消息被路由到其中一个broker。 Pulsar自动处理跨broker的分区分布。

下图对此做了阐明:

Messaging - 图8

The Topic1 topic has five partitions (P0 through P4) split across three brokers. 因为分区多于broker数量,其中有两个broker要处理两个分区。第三个broker则只处理一个。(再次强调,分区的分布是Pulsar自动处理的)。

这个topic的消息被广播给两个consumer。 确定每条消息该发往哪个分区,而订阅模式确定消息传递给哪个消费者。

分区topic和普通topic,对于订阅模式如何工作,没有任何不同。分区只是决定了从生产者生产消息到消费者处理及确认消息过程中发生的事情。

Partitioned topics need to be explicitly created via the . The number of partitions can be specified when creating the topic.

When publishing to partitioned topics, you must specify a routing mode. The routing mode determines which partition—-that is, which internal topic—-each message should be published to.

有三种 MessageRoutingMode 可用:

顺序保证

The ordering of messages is related to MessageRoutingMode and Message Key. Usually, user would want an ordering of Per-key-partition guarantee.

当使用 SinglePartition或者RoundRobinPartition模式时,如果消息有key,消息将会被路由到匹配的分区,这是基于ProducerBuilder 中 指定的散列shema。

顺序保证Description路由策略与消息Key
每个 key 分区所有具有相同 key 的消息将按顺序排列并放置在相同的分区(Partition)中。使用 SinglePartitionRoundRobinPartition 模式,每条消息都需要有key。
同一个生产者来自同一生产者的所有消息都是有序的路由策略为SinglePartition, 且每条消息都没有key。

散列scheme

是代表一组标准散列函数的枚举。为一个指定消息选择分区时使用。

有两种可用的散列函数: JavaStringHashMurmur3_32Hash. The default hashing function for producer is JavaStringHash. 请注意,当producer可能来自于不同语言客户端时,JavaStringHash是不起作用的。建议使用Murmur3_32Hash

By default, Pulsar persistently stores all unacknowledged messages on multiple BookKeeper bookies (storage nodes). 因此,持久性主题上的消息数据可以在 broker 重启和订阅者故障转移之后继续存在。

Pulsar also, however, supports non-persistent topics, which are topics on which messages are never persisted to disk and live only in memory. Pulsar也提供了非持久topic。非持久topic的消息不会被保存在硬盘上,只存活于内存中。当使用非持久topic分发时,杀掉Pulsar的broker或者关闭订阅者,此topic( non-persistent))上所有的瞬时消息都会丢失,意味着客户端可能会遇到消息缺失。

非持久性主题具有这种形式的名称(注意名称中的 non-persistent):

如何使用非持久topic的更多信息,请参考

In non-persistent topics, brokers immediately deliver messages to all connected subscribers without persisting them in BookKeeper. 如果有一个订阅者断开连接,broker将无法重发这些瞬时消息,订阅者将永远也不能收到这些消息了。 去掉持久化存储的步骤,在某些情况下,使得非持久topic的消息比持久topic稍微变快。但是同时,Pulsar的一些核心优势也丧失掉了。

非持久topic,消息数据仅存活在内存。 如果broker挂掉或者因其他情况不能从内存取到,你的消息数据就可能丢失。 Use non-persistent topics only if you’re certain that your use case requires it and can sustain it.

默认非持久topic在broker上是开启的。 你可以通过broker的关闭。 你可以通过使用pulsar-admin-topics接口管理非持久topic。

性能

Non-persistent messaging is usually faster than persistent messaging because brokers don’t persist messages and immediately send acks back to the producer as soon as that message is delivered to connected brokers. 非持久topic让producer有更低的发布延迟。

客户端API

Producer和consumer以连接持久topic同样的方式连接到非持久topic。重要的区别是,topic的名称必须以non-persistent开头。 三种订阅模式—,shared,对于非持久topic都是支持的。

下面是一个非持久topic的java consumer例子:

  1. PulsarClient client = PulsarClient.builder()
  2. .serviceUrl("pulsar://localhost:6650")
  3. .build();
  4. String npTopic = "non-persistent://public/default/my-topic";
  5. String subscriptionName = "my-subscription-name";
  6. Consumer<byte[]> consumer = client.newConsumer()
  7. .topic(npTopic)
  8. .subscriptionName(subscriptionName)
  9. .subscribe();

这里还有一个非持久topic的例子:

  1. Producer<byte[]> producer = client.newProducer()
  2. .topic(npTopic)
  3. .create();

消息保留和到期

Pulsar broker默认如下:

  • immediately delete all messages that have been acknowledged by a consumer, and
  • 以消息backlog的形式,所有的未被确认消息

Pulsar有两个特性,让你可以覆盖上面的默认行为。

  • Message retention enables you to store messages that have been acknowledged by a consumer
  • Message expiry enables you to set a time to live (TTL) for messages that have not yet been acknowledged

All message retention and expiry is managed at the namespace level. For a how-to, see the cookbook.

下图说明了这两种概念:

图中上面的是消息存留,存留规则会被用于某namespace下所有的topic,指明哪些消息会被持久存储,即使已经被确认过。 没有被留存规则覆盖的消息将会被删除。 Without a retention policy, all of the acknowledged messages would be deleted.

图中下面的是消息过期,有些消息即使还没有被确认,也被删除掉了。因为根据设置在namespace上的TTL,他们已经过期了。(例如,TTL为5分钟,过了十分钟消息还没被确认)

Message deduplication

Message duplication occurs when a message is persisted](/docs/zh-CN/concepts-architecture-overview#persistent-storage) by Pulsar more than once. Message deduplication is an optional Pulsar feature that prevents unnecessary message duplication by processing each message only once, even if the message is received more than once.

下图展示了开启和关闭消息去重的场景:

Pulsar消息去重

最上面的场景中,消息去重被关闭。 Producer发布消息1到一个topic,消息到达broker后,被到BookKeeper。 然后producer又发送了消息1(可能因为某些重试逻辑),然后消息被接收后又持久化在BookKeeper,这意味着消息重复发生了。

在第二个场景中,producer发送了消息1,消息被broker接收然后持久化,和第一个场景是一样的。 当producer再次发送消息时,broker知道已经收到个消息1,所以不会再持久化消息1.

生产者幂等

The other available approach to message deduplication is to ensure that each message is only produced once. This approach is typically called producer idempotency. 这种方式的缺点是,把消息去重的工作推给了应用去做。 In Pulsar, this is handled at the level, so you do not need to modify your Pulsar client code. Instead, you only need to make administrative changes. For details, see Managing message deduplication.

去重和实际一次语义

Message deduplication makes Pulsar an ideal messaging system to be used in conjunction with stream processing engines (SPEs) and other systems seeking to provide effectively-once processing semantics. Messaging systems that do not offer automatic message deduplication require the SPE or other system to guarantee deduplication, which means that strict message ordering comes at the cost of burdening the application with the responsibility of deduplication. 使用Pulsar,严格的顺序保证不会带来任何应用层面的代价。

You can find more in-depth information in this post.

消息延迟传递

Delayed message delivery enables you to consume a message later rather than immediately. In this mechanism, a message is stored in BookKeeper, DelayedDeliveryTracker maintains the time index(time -> messageId) in memory after published to a broker, and it is delivered to a consumer once the specific delayed time is passed.

Delayed message delivery only works in Shared subscription mode. In Exclusive and Failover subscription modes, the delayed message is dispatched immediately.

The diagram below illustrates the concept of delayed message delivery:

A broker saves a message without any check. When a consumer consumes a message, if the message is set to delay, then the message is added to DelayedDeliveryTracker. A subscription checks and gets timeout messages from DelayedDeliveryTracker.

Broker

The following is an example of delayed message delivery for a producer in Java:

  1. producer.newMessage().deliverAfter(3L, TimeUnit.Minute).value("Hello Pulsar!").send();