Message retention and expiry
因此,在一个没有创建任何订阅的主题上不会保留任何消息(默认情况下)。
(注意,不再被存储的消息不需要立即删除,事实上可能仍然可以被访问,直到下一个 ledger 滚动创建。 因为客户端无法预测 ledger 什么时候发生滚动,所以依赖滚动发生在合适的时间是不明智的。)
在 Pulsar 中,你有两种方式在命名空间的级别去修改这种行为:
- 你可以通过设置消息保留策略持久化存储不在 backlog 内的消息(因为他们已经在每个现有的订阅上被确认,或者并没有被订阅)。
- 可以通过设置(TTL),设置消息在指定的时间内不被确认的话,自动确认。
你可以通过 Pulsar admin 接口 在命名空间(租户、集群或集群) 级别管理消息保留策略和TTL。
默认情况下,当消息到达 broker 时,会一直保留这条消息直到消费者确认已消费到这条消息,此时它才被删除。 You can override this behavior and retain messages that have already been acknowledged on all subscriptions by setting a retention policy for all topics in a given namespace. Retention is based on both a size limit and a time limit.
当使用 Reader 接口时,保留策略是有用的。 因为 Reader 接口不使用消息确认机制,消息将永远不会存在 backlog 中。 在只读的使用场景中,保留时间是必须配置的。
When you set a retention policy on topics in a namespace, you must set both a size limit and a time limit. 您可以参考以下表格来设置 和 Java 中的保留策略。
消息保留策略配置会对没有订阅的主题的所有消息生效,或者对有订阅的主题并已被所有消费者确认的消息生效。 保留策略设置不影响订阅主题的未确认消息。 未确认的信息受积压配额控制。
当超过保留限额时,最老的消息会被标记删除,直到保留的消息在指定的限制范围之内。
你可以通过以下两个参数来配置实例级别的消息保留策略:: defaultRetentionTimeInminutes
and defaultReintentionSizeInMB
默认情况下,这两个参数都设置为 0
。
如果需要了解两个参数的更多信息,请参阅 broker.conf 配置文件。
设置保留策略
你可以通过在 pulsar-admin
、REST API 和 Java 中自定命名空间、大小限制和时间限制来设置命名空间的保留策略。
pulsar-admin
REST API
Java
使用set-retention子命令并指定命名空间,使用-s
/--size
参数指定大小限制,使用-t
/--time
参数指定时间限制。
- When the size of messages reaches 10 GB on a topic within 3 hours, the acknowledged messages will not be retained.
- After 3 hours, even if the message size is less than 10 GB, the acknowledged messages will not be retained.
In the following example, the time is not limited and the size limit is set to 1 TB. The size limit determines the retention.
$ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size 1T \ --time -1
In the following example, the size is not limited and the time limit is set to 3 hours. The time limit determines the retention.
$ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size -1 \ --time 3h
要实现无限留存,将两个值都设置为 -1
。
$ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size -1 \ --time -1
要禁用消息保留恶略配置,将值设置为 0
。
$ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size 0 \ --time 0
int retentionTime = 10; // 10 minutesint retentionSize = 500; // 500 megabytesRetentionPolicies policies = new RetentionPolicies(retentionTime, retentionSize);admin.namespaces().setRetention(namespace, policies);
获取保留策略
You can fetch the retention policy for a namespace by specifying the namespace. The output will be a JSON object with two keys: retentionTimeInMinutes
and retentionSizeInMB
.
pulsar-admin
使用 get-retention子命令并指定命名空间。
示例
{
"retentionTimeInMinutes": 10,
"retentionSizeInMB": 500
}
REST API
Java
Backlogs are sets of unacknowledged messages for a topic that have been stored by bookies. Pulsar stores all unacknowledged messages in backlogs until they are processed and acknowledged.
You can control the allowable size of backlogs, at the namespace level, using backlog quotas. Setting a backlog quota involves setting:
待办:如何对每个主题或每个 backlog 进行扩展?
- an allowable size threshold for each topic in the namespace
可以使用以下保留策略:
策略 | 触发的操作 |
---|---|
producer_request_hold | Broker 会持有生产者投递的消息,但并不会把投递的消息进行持久化存储 |
producer_exception | Broker 会与客户端断开连接并抛出异常 |
consumer_backlog_eviction | Broker 将开始丢弃backlog的消息 |
Backlog quotas are handled at the namespace level. They can be managed via:
您可以通过指定命名空间、大小限制和/或时间限制(以秒为单位)以及按名称指定的策略,为中的所有主题设置大小或时间阈值(可以同时设置大小和时间阈值或者只设置其中一个阈值)以及积压保留策略。
pulsar-admin
使用子命令,并使用 -l
/--limit
参数指定命名空间大小限制 ,以及使用 <code>-p
/--policy
参数指定保留策略。
示例
$ pulsar-admin namespaces set-backlog-quot-tenant/my-ns \
--limit 2G
--limitTime 36000 \
--policy producer_request_hold
REST API
Java
long sizeLimit = 2147483648L;
BacklogQuota.RetentionPolicy policy = BacklogQuota.RetentionPolicy.producer_request_hold;
BacklogQuota quota = new BacklogQuota(sizeLimit, policy);
admin.namespaces().setBacklogQuota(namespace, quota);
获取积压阈值和积压保留政策
可以查看已对命名空间应用的大小阈值和 backlog 保留策略。
pulsar-admin
Use the subcommand and specify a namespace. 下面是一个示例:
$ pulsar-admin namespaces get-backlog-quotas my-tenant/my-ns
"destination_storage": {
"limit" : 2147483648,
"policy" : "producer_request_hold"
}
}
REST API
Java
Map<BacklogQuota.BacklogQuotaType,BacklogQuota> quotas =
admin.namespaces().getBacklogQuotas(namespace);
移除 backlog quota 策略
pulsar-admin
Use the subcommand and specify a namespace. 下面是一个示例:
$ pulsar-admin namespaces remove-backlog-quota my-tenant/my-ns
REST API
Java
admin.namespaces().removeBacklogQuota(namespace);
pulsar-admin
使用 clear-backlog 子命令。
示例
By default, you will be prompted to ensure that you really want to clear the backlog for the namespace. You can override the prompt using the -f
/--force
flag.
默认情况下,Pulsar 会永久存储所有未确认的消息。 在大量消息未得到确认的情况下,可能会导致大量磁盘空间的使用。 如果需要考虑磁盘空间,可以设置生存时间(TTL),以确定未确认的消息将保留多长时间。
设置命名空间的TTL
pulsar-admin
使用set-message-ttl子命令并指定命名空间和TTL(以秒为单位,使用-ttl
/--messageTTL
参数指定)。
示例
$ pulsar-admin namespaces set-message-ttl my-tenant/my-ns \
--messageTTL 120 # TTL of 2 minutes
REST API
Java
admin.namespaces().setNamespaceMessageTTL(namespace, ttlInSeconds);
获取命名空间的 TTL 配置
pulsar-admin
使用 子命令并指定命名空间。
示例
$ pulsar-admin namespaces get-message-ttl my-tenant/my-ns
60
REST API
GET /admin/v2/namespaces/:tenant/:namespace/messageTTL
Java
admin.namespaces().getNamespaceMessageTTL(namespace)
pulsar-admin
使用 子命令并指定命名空间。
示例
$ pulsar-admin namespaces remove-message-ttl my-tenant/my-ns
REST API
DELETE /admin/v2/namespaces/:tenant/:namespace/messageTTL
Java
如果没有任何保留期,且从未有过很多积压, 已被确认消息的保留时间等于 Pulsar segment 滚动周期 + entry 日志滚动期 + (garbage collection interval * 垃圾收集比率)。
Segment rollover period: basically, the segment rollover period is how often a new segment is created. 一旦创建新的片段,旧的片段会被删除。 默认情况下,这种情况会在你已写入50,000条条目 (消息) 时或者等待240分钟之后发生。 你可以在broker服务端调整这个策略。
Entry log rollover period: multiple ledgers in BookKeeper are interleaved into an entry log. 对于已删除的 ledger ,必须将条目日志 entry log 全部滚动。 Entry log 滚动周期是可配置的,但完全基于条目日志的大小。 详情参阅 。 Entry log 一经滚动,就可以进行垃圾回收 entry log。