Topic compaction

如要使用压缩:

  • You need to give messages keys, as topic compaction in Pulsar takes place on a per-key basis (i.e. messages are compacted based on their key). 对于股票报价器这个使用场景,股票代码 或 GOOG-—都可以用作这里需要的 key (详情请看 下面)。 没有 key 的消息会被压缩过程单独留出。
  • 压缩可以配置为,也可以通过 Pulsar Admin API 去触发压缩。

如果要说主题能从压缩中得到什么好处,一个很好的示例就是股票报价器主题,因为消费者们能从中获取到指定股票的最新消息。 想象这样一个场景:包含股票数据的消息使用股票代码作为 key (如 GOOG, AAPL, TWTR)。 压缩这个主题将为消费者在这个主题上提供两个选项:

  • 如果他们需要访问“历史”值(主题的所有信息),他们可以从“原始”非压缩主题读取。
  • 如果他们只想看到最新消息,他们就可以从压缩主题中读取。

因此,如果你在使用一个叫做 stock-values(股票值) 的 Pulsar 主题,即便一些消费者能访问主题中的所有消息(也许因为他们需要对过去几个小时的变化做计算),他们也会习惯性地使用实时股票报价器来仅仅查看压缩主题(从而不会强制处理那些过期的消息)。 要从哪种主题中拉取消息是由消费者的 决定的。

比如,当积压达到 100MB 时触发压缩:

为命名空间配置的压缩阈值将适用于命名空间中的所有主题。

为了在主题上执行压缩,你需要在 pulsar-admin 命令行工具使用 命令。 下面是一个示例:

  1. $ bin/pulsar-admin topics compact \

pulsar-admin 工具通过 Pulsar REST API 来执行压缩。 To run compaction in its own dedicated process, i.e. not through the REST API, you can use the command. 下面是一个示例:

  1. $ bin/pulsar compact-topic \
  2. --broker-conf /path/to/broker.conf \
  3. --topic persistent://my-tenant/my-namespace/my-topic
  4. $ bin/pulsar compact-topic \
  5. --topic persistent://my-tenant/my-namespace/my-topic

我该在什么时候触发压缩?

的频率会因使用场景而产生很大差异。 如果你想要一个压缩主题有极快的读取速度,那么你应该相当频繁地执行压缩。

Pulsar consumers and readers need to be configured to read from compacted topics. The sections below show you how to enable compacted topic reads for Pulsar’s language clients. If the

In order to read from a compacted topic using a Java consumer, the readCompacted parameter must be set to true. Here’s an example consumer for a compacted topic:

As mentioned above, topic compaction in Pulsar works on a per-key basis. 这意味着您在压缩主题上生成的消息需要有 key(key 的内容将取决于你的用例)。 没有 key 的消息会被压缩过程单独留出。 下面是一个带 key 的 Pulsar 消息示例:

  1. import org.apache.pulsar.client.api.Message;
  2. import org.apache.pulsar.client.api.MessageBuilder;
  3. Message<byte[]> msg = MessageBuilder.create()
  4. .setContent(someByteArray)

下面的示例展示了一个带 key 的消息在 Pulsar 压缩主题上被生产的过程。