Managing Namespaces

    命名空间的管理方式:

    可以在给定的下创建命名空间。

    pulsar-admin

    REST API

    Java

    使用 create 命令,指定命名空间的名称:

    1. admin.namespaces().createNamespace(namespace);

    获取策略

    用户可以随时获取与命名空间相关的现有策略。

    pulsar-admin

    REST API

    Java

    使用 子命令,指定命名空间:

    1. $ pulsar-admin namespaces policies test-tenant/test-namespace{ "auth_policies": { "namespace_auth": {}, "destination_auth": {} }, "replication_clusters": [], "bundles_activated": true, "bundles": { "boundaries": [ "0x00000000", "0xffffffff" ], "numBundles": 1 }, "backlog_quota_map": {}, "persistence": null, "latency_stats_sample_rate": {}, "message_ttl_in_seconds": 0, "retention_policies": null, "deleted": false}

    GET /admin/v2/namespaces/:tenant/:namespace

    1. admin.namespaces().getPolicies(namespace);

    获取命名空间列表

    给定 Pulsar 租户,可以列出其中所有的命名空间。

    pulsar-admin

    REST API

    Java

    使用 子命令,指定租户:

    1. $ pulsar-admin namespaces list test-tenanttest-tenant/ns1test-tenant/ns2

    GET /admin/v2/namespaces/:tenant

      删除命名空间

      可以删除租户下已经存在的命名空间。

      pulsar-admin

      REST API

      Java

      使用 delete 子命令,指定命名空间:

      1. $ pulsar-admin namespaces delete test-tenant/ns1

      1. admin.namespaces().deleteNamespace(namespace);

      配置复制集群

      设置复制集群

      为命名空间设置复制集群,因此 Pulsar 可以从一个 colo 复制消息到另一个 colo。

      pulsar-admin

      REST API

      Java

      1. $ pulsar-admin namespaces set-clusters test-tenant/ns1 \ --clusters cl1
      1. {@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/replication|operation/setNamespaceReplicationClusters?version=[[pulsar:version_number]]}
      1. admin.namespaces().setNamespaceReplicationClusters(namespace, clusters);

      获取复制集群

      获取给定命名空间复制集群的列表。

      pulsar-admin

      REST API

      Java

      1. $ pulsar-admin namespaces get-clusters test-tenant/cl1/ns1
      1. cl2
      1. {@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/replication|operation/getNamespaceReplicationClusters?version=[[pulsar:version_number]]}
      1. admin.namespaces().getNamespaceReplicationClusters(namespace)

      设置 backlog quota 策略

      Backlog quota helps the broker to restrict bandwidth/storage of a namespace once it reaches a certain threshold limit. Admin can set the limit and take corresponding action after the limit is reached.

      1. producer_request_hold:broker 暂停运行,并不再持久化生产请求负载

      2. producer_exception:broker 抛出异常,并与客户端断开连接。

      3. consumer_backlog_eviction:broker 丢弃积压消息

      可以通过定义 backlog-quota-type: destination_storage 来限制积压配额

      pulsar-admin

      REST API

      Java

      1. $ pulsar-admin namespaces set-backlog-quota --limit 10G --limitTime 36000 --policy producer_request_hold test-tenant/ns1
      1. N/A
      1. {@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/backlogQuota|operation/setBacklogQuota?version=[[pulsar:version_number]]}
      1. admin.namespaces().setBacklogQuota(namespace, new BacklogQuota(limit, limitTime, policy))

      获取 backlog quota 策略

      查看给定命名空间已配置的积压配额。

      pulsar-admin

      REST API

      Java

      1. $ pulsar-admin namespaces get-backlog-quotas test-tenant/ns1
      1. { "destination_storage": { "limit": 10, "policy": "producer_request_hold" }}
      1. {@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/backlogQuotaMap|operation/getBacklogQuotaMap?version=[[pulsar:version_number]]}
      1. admin.namespaces().getBacklogQuotaMap(namespace);

      移除 backlog quota 策略

      移除指定命名空间的积压配额策略

      REST API

      Java

      1. $ pulsar-admin namespaces remove-backlog-quota test-tenant/ns1
      1. N/A
      1. {@inject: endpoint|DELETE|/admin/v2/namespaces/:tenant/:namespace/backlogQuota|operation/removeBacklogQuota?version=[[pulsar:version_number]]}
      1. admin.namespaces().removeBacklogQuota(namespace, backlogQuotaType)

      配置持久化策略

      设置持久化策略

      持久化策略可以为给定命名空间下 topic 上的所有消息配置持久等级。

      • Bookkeeper-ack-quorum:每个 entry 在等待的 acks(有保证的副本)数量,默认值:0

      • Bookkeeper-ensemble:单个 topic 使用的 bookie 数量,默认值:0

      • Bookkeeper-write-quorum:每个 entry 要写入的次数,默认值:0

      • Ml-mark-delete-max-rate:标记-删除操作的限制速率(0表示无限制),默认值:0.0

      pulsar-admin

      REST API

      Java

      1. $ pulsar-admin namespaces set-persistence --bookkeeper-ack-quorum 2 --bookkeeper-ensemble 3 --bookkeeper-write-quorum 2 --ml-mark-delete-max-rate 0 test-tenant/ns1
      1. N/A
      1. {@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/persistence|operation/setPersistence?version=[[pulsar:version_number]]}
      1. admin.namespaces().setPersistence(namespace,new PersistencePolicies(bookkeeperEnsemble, bookkeeperWriteQuorum,bookkeeperAckQuorum,managedLedgerMaxMarkDeleteRate))

      获取持久化策略

      可通过以下方式查看指定命名空间配置的持久化策略。

      pulsar-admin

      REST API

      Java

      1. $ pulsar-admin namespaces get-persistence test-tenant/ns1
      1. { "bookkeeperEnsemble": 3, "bookkeeperWriteQuorum": 2, "bookkeeperAckQuorum": 2, "managedLedgerMaxMarkDeleteRate": 0}
      1. {@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/persistence|operation/getPersistence?version=[[pulsar:version_number]]}
      1. admin.namespaces().getPersistence(namespace)

      配置命名空间 bundle

      卸载命名空间 bundle

      命名空间 bundle 是虚拟的 topic 组,这些 topic 来自同一个命名空间。 如果由于 bundle 数量过多,导致 broker 过载,可以通过以下命令卸载 bundle 到负载较小的 broker。 命名空间 bundle ID 为十六进制哈希值,范围为 0x000000 到 0xffffff。

      pulsar-admin

      REST API

      Java

      1. N/A
      1. {@inject: endpoint|PUT|/admin/v2/namespaces/:tenant/:namespace/{bundle}/unload|operation/unloadNamespaceBundle?version=[[pulsar:version_number]]}
      1. admin.namespaces().unloadNamespaceBundle(namespace, bundle)

      拆分命名空间 bundle

      命名空间包可以包含多个 topic,但只能由一个 broker 来提供服务。 如果单个 bundle 使 broker 过载,管理员可以通过此命令拆分 bundle,并通过卸载拆分后的 bundle 使 broker 之间负载均衡。

      pulsar-admin

      REST API

      Java

      1. $ pulsar-admin namespaces split-bundle --bundle 0x00000000_0xffffffff test-tenant/ns1
      1. N/A
      1. {@inject: endpoint|PUT|/admin/v2/namespaces/:tenant/:namespace/{bundle}/split|operation/splitNamespaceBundle?version=[[pulsar:version_number]]}
      1. admin.namespaces().splitNamespaceBundle(namespace, bundle)

      配置消息存活时间(TTL)

      Set message-ttl

      配置消息存活时间 (以秒为单位)。

      pulsar-admin

      REST API

      Java

      1. N/A
      1. {@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/messageTTL|operation/setNamespaceMessageTTL?version=[[pulsar:version_number]]}
      1. admin.namespaces().setNamespaceMessageTTL(namespace, messageTTL)

      Get message-ttl

      获取命名空间中消息的存活时间(ttl)。

      pulsar-admin

      REST API

      Java

      1. $ pulsar-admin namespaces get-message-ttl test-tenant/ns1
      1. 100
      1. {@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/messageTTL|operation/getNamespaceMessageTTL?version=[[pulsar:version_number]]}
      1. admin.namespaces().getNamespaceMessageTTL(namespace)

      Remove message-ttl

      Remove a message TTL of the configured namespace.

      pulsar-admin

      REST API

      Java

        1. 100
        1. {@inject: endpoint|DELETE|/admin/v2/namespaces/:tenant/:namespace/messageTTL|operation/removeNamespaceMessageTTL?version=[[pulsar:version_number]]}
        1. admin.namespaces().removeNamespaceMessageTTL(namespace)

        清除积压消息

        清除命名空间 backlog

        It clears all message backlog for all the topics that belong to a specific namespace. You can also clear backlog for a specific subscription as well.

        pulsar-admin

        REST API

        Java

        1. $ pulsar-admin namespaces clear-backlog --sub my-subscription test-tenant/ns1
        1. N/A
        1. {@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/clearBacklog|operation/clearNamespaceBacklogForSubscription?version=[[pulsar:version_number]]}
        1. admin.namespaces().clearNamespaceBacklogForSubscription(namespace, subscription)

        清除 bundle backlog

        It clears all message backlog for all the topics that belong to a specific NamespaceBundle. You can also clear backlog for a specific subscription as well.

        pulsar-admin

        REST API

        Java

        1. $ pulsar-admin namespaces clear-backlog --bundle 0x00000000_0xffffffff --sub my-subscription test-tenant/ns1
        1. N/A
        1. {@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/{bundle}/clearBacklog|operation?version=[[pulsar:version_number]]/clearNamespaceBundleBacklogForSubscription}
        1. admin.namespaces().clearNamespaceBundleBacklogForSubscription(namespace, bundle, subscription)

        设置消息保留参数

        命名空间包含多个 topic,每个 topic 的保留大小(存储大小)或保留时间不应超过特定阈值。 可通过以下命令配置指定命名空间中 topic 的保留大小和保留时间。

        pulsar-admin

        REST API

        Java

        1. $ pulsar-admin set-retention --size 100 --time 10 test-tenant/ns1
        1. N/A
        1. {@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/retention|operation/setRetention?version=[[pulsar:version_number]]}
        1. admin.namespaces().setRetention(namespace, new RetentionPolicies(retentionTimeInMin, retentionSizeInMB))

        获取消息保留参数

        pulsar-admin

        REST API

        Java

        1. $ pulsar-admin namespaces get-retention test-tenant/ns1
        1. { "retentionTimeInMinutes": 10, "retentionSizeInMB": 100}
        1. {@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/retention|operation/getRetention?version=[[pulsar:version_number]]}

        配置 topic 的消息派发速率

        设置 topic 的消息派发速率

        为给定命名空间下所有 topic 设置消息派发速率。 通过每 X 秒派发的消息数量(msg-dispatch-rate)或者每 X 秒派发消息的总字节数来限制(byte-dispatch-rate)消息派发速率。 派发速率指每秒派发的消息数,可通过 dispatch-rate-period 来配置。 msg-dispatch-ratebyte-dispatch-rate 的默认值均为 -1,即禁用配额限制。

        pulsar-admin

        REST API

        Java

        1. $ pulsar-admin namespaces set-dispatch-rate test-tenant/ns1 \ --msg-dispatch-rate 1000 \ --byte-dispatch-rate 1048576 \ --dispatch-rate-period 1
        1. {@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/dispatchRate|operation/setDispatchRate?version=[[pulsar:version_number]]}
        1. admin.namespaces().setDispatchRate(namespace, new DispatchRate(1000, 1048576, 1))

        获取 topic 的消息派发速率

        获取命名空间中已配置的消息派发速率(该命名空间下 topic 发送消息数 / 秒)

        pulsar-admin

        REST API

        Java

        1. $ pulsar-admin namespaces get-dispatch-rate test-tenant/ns1
        1. { "dispatchThrottlingRatePerTopicInMsg" : 1000, "dispatchThrottlingRatePerTopicInByte" : 1048576, "ratePeriodInSecond" : 1}
        1. admin.namespaces().getDispatchRate(namespace)

        配置订阅的消息派发速率

        设置订阅的消息派发速率

        为给定命名空间下 topic 中的所有订阅设置消息派发速率。 通过每 X 秒派发的消息数量(msg-dispatch-rate)或者每 X 秒派发消息的总字节数来限制(byte-dispatch-rate)消息派发速率。 派发速率指每秒派发的消息数,可通过 dispatch-rate-period 来配置。 msg-dispatch-ratebyte-dispatch-rate 的默认值均为 -1,即禁用配额限制。

        pulsar-admin

        REST API

        Java

        1. $ pulsar-admin namespaces set-subscription-dispatch-rate test-tenant/ns1 \ --msg-dispatch-rate 1000 \ --byte-dispatch-rate 1048576 \ --dispatch-rate-period 1
        1. {@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/subscriptionDispatchRate|operation/setDispatchRate?version=[[pulsar:version_number]]}
        1. admin.namespaces().setSubscriptionDispatchRate(namespace, new DispatchRate(1000, 1048576, 1))

        获取订阅的消息派发速率

        获取命名空间中已配置的消息派发速率(该命名空间下 topic 发送消息数 / 秒)

        pulsar-admin

        REST API

        Java

        1. $ pulsar-admin namespaces get-subscription-dispatch-rate test-tenant/ns1
        1. { "dispatchThrottlingRatePerTopicInMsg" : 1000, "dispatchThrottlingRatePerTopicInByte" : 1048576, "ratePeriodInSecond" : 1}
        1. {@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/subscriptionDispatchRate|operation/getDispatchRate?version=[[pulsar:version_number]]}
        1. admin.namespaces().getSubscriptionDispatchRate(namespace)

        配置复制器的消息派发速率

        设置复制器的消息派发速率

        为给定命名空间下复制集群之间的所有复制器设置消息派发速率。 通过每 X 秒派发的消息数量(msg-dispatch-rate)或者每 X 秒派发消息的总字节数来限制(byte-dispatch-rate)消息派发速率。 派发速率指每秒派发的消息数,可通过 dispatch-rate-period 来配置。 msg-dispatch-ratebyte-dispatch-rate 的默认值均为 -1,即禁用配额限制。

        pulsar-admin

        REST API

        Java

        1. $ pulsar-admin namespaces set-replicator-dispatch-rate test-tenant/ns1 \ --msg-dispatch-rate 1000 \ --byte-dispatch-rate 1048576 \ --dispatch-rate-period 1
        1. {@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/replicatorDispatchRate|operation/setDispatchRate?version=[[pulsar:version_number]]}
        1. admin.namespaces().setReplicatorDispatchRate(namespace, new DispatchRate(1000, 1048576, 1))

        获取复制器的消息派发速率

        获取命名空间中已配置的消息派发速率(该命名空间下 topic 发送消息数 / 秒)

        pulsar-admin

        REST API

        Java

        1. $ pulsar-admin namespaces get-replicator-dispatch-rate test-tenant/ns1
        1. { "dispatchThrottlingRatePerTopicInMsg" : 1000, "dispatchThrottlingRatePerTopicInByte" : 1048576, "ratePeriodInSecond" : 1}
        1. {@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/replicatorDispatchRate|operation/getDispatchRate?version=[[pulsar:version_number]]}
        1. admin.namespaces().getReplicatorDispatchRate(namespace)

        配置去重快照间隔

        获取去重快照间隔

        获取命名空间中的 deduplicationSnapshotInterval 参数值(命名空间下的每个 topic 都根据此间隔进行去重快照)

        pulsar-admin

        REST API

        Java

        1. $ pulsar-admin namespaces get-deduplication-snapshot-interval test-tenant/ns1
        1. {@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/deduplicationSnapshotInterval?version=[[pulsar:version_number]]}
        1. admin.namespaces().getDeduplicationSnapshotInterval(namespace)

        设置去重快照间隔

        为命名空间设置 deduplicationSnapshotInterval。 命名空间下的每个 topic 都根据此间隔进行去重快照。 使用此属性,需将 brokerDeduplicationEnabled 设置为 true

        pulsar-admin

        REST API

        Java

        1. $ pulsar-admin namespaces set-deduplication-snapshot-interval test-tenant/ns1 --interval 1000
        1. {@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/deduplicationSnapshotInterval?version=[[pulsar:version_number]]}
        1. { "interval": 1000}
        1. admin.namespaces().setDeduplicationSnapshotInterval(namespace, 1000)

        移除去重快照间隔

        移除命名空间中的 deduplicationSnapshotInterval 参数(命名空间下的每个 topic 都根据此间隔进行去重快照)

        pulsar-admin

        REST API

        Java

        1. $ pulsar-admin namespaces remove-deduplication-snapshot-interval test-tenant/ns1
        1. {@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/deduplicationSnapshotInterval?version=[[pulsar:version_number]]}
        1. admin.namespaces().removeDeduplicationSnapshotInterval(namespace)

        You can use the Pulsar isolation policy to allocate resources (broker and bookie) for a namespace.

        从 Broker 上卸载命名空间

        你能从这Pulsar broker 管理上卸载一个命名空间,或一个 。

        pulsar-admin

        Use the subcommand of the namespaces command.

        pulsar-admin

        REST

        Java

        1. $ pulsar-admin namespaces unload my-tenant/my-ns