以下说明和命令中所使用 topic 的名称结构如下:

    获取非持久化 topic 的统计数据。

    • msgRateIn: The sum of all local and replication publishers’ publish rates in messages per second

    • msgThroughputIn: Same as above, but in bytes per second instead of messages per second

    • msgRateOut: The sum of all local and replication consumers’ dispatch rates in messages per second

    • msgThroughputOut: Same as above, but in bytes per second instead of messages per second

    • averageMsgSize: The average size in bytes of messages published within the last interval

    • publishers: The list of all local publishers into the topic. There can be zero or thousands

    • averageMsgSize: Average message size in bytes from this publisher within the last interval

    • producerId: Internal identifier for this producer on this topic

    • producerName: Internal identifier for this producer, generated by the client library

    • address: IP address and source port for the connection of this producer

    • subscriptions: The list of all local subscriptions to the topic

    • my-subscription: The name of this subscription (client defined)

    • consumers: The list of connected consumers for this subscription

    • consumerName: Internal identifier for this consumer, generated by the client library

    • availablePermits: The number of messages this consumer has space for in the client library’s listen queue. 值小于 1 意味着客户端库队列已满,不能继续调用 receive()。 非负整数值意味着 consumer 可以随时接收消息。

    • replication: This section gives the stats for cross-colo replication of this topic

    • connected: Whether the outbound replicator is connected

    • inboundConnection: The IP and port of the broker in the remote cluster’s publisher connection to this broker

    • inboundConnectedSince: The TCP connection being used to publish messages to the remote cluster. 如果没有连接到本地发布者,一分钟后连接将自动关闭。

    • msgDropRate: for publisher: publish: broker only allows configured number of in flight per connection, and drops all other published messages above the threshold. 若限制不可用或连接不可写入,broker 也会丢弃订阅的消息。

    1. "msgRateIn": 4641.528542257553,
    2. "msgThroughputIn": 44663039.74947473,
    3. "msgRateOut": 0,
    4. "msgThroughputOut": 0,
    5. "averageMsgSize": 1232439.816728665,
    6. "storageSize": 135532389160,
    7. "msgDropRate" : 0.0,
    8. "publishers": [
    9. {
    10. "msgRateIn": 57.855383881403576,
    11. "msgThroughputIn": 558994.7078932219,
    12. "averageMsgSize": 613135,
    13. "producerId": 0,
    14. "producerName": null,
    15. "address": null,
    16. "connectedSince": null,
    17. "msgDropRate" : 0.0
    18. }
    19. ],
    20. "subscriptions": {
    21. "my-topic_subscription": {
    22. "msgRateOut": 0,
    23. "msgBacklog": 116632,
    24. "type": null,
    25. "consumers" : [ {
    26. "msgRateOut" : 20343.506296021893,
    27. "msgThroughputOut" : 2.0979855364233278E7,
    28. "msgRateRedeliver" : 0.0,
    29. "consumerName" : "fe3c0",
    30. "availablePermits" : 950,
    31. "unackedMessages" : 0,
    32. "blockedConsumerOnUnackedMsgs" : false,
    33. "address" : "/10.73.210.249:60578",
    34. "connectedSince" : "2017-07-26 15:13:48.026-0700",
    35. "clientVersion" : "1.19-incubating-SNAPSHOT"
    36. } ],
    37. "msgDropRate" : 432.2390921571593
    38. }
    39. },
    40. "replication": {}
    41. }

    pulsar-admin

    可以使用 stats 命令来获取 topic 的统计信息。

    1. non-persistent://test-tenant/ns1/tp1 \

    REST API

    Java

    1. String topic = "non-persistent://my-tenant/my-namespace/my-topic";
    2. admin.nonPersistentTopics().getStats(topic);

    获取 topic 的详细统计信息。

    pulsar-admin

    可以使用 stats-internal 命令来获取 topic 的内部统计信息。

    REST API

    GET /admin/v2/non-persistent/:tenant/:namespace/:topic/internalStats

    Java

    1. String topic = "non-persistent://my-tenant/my-namespace/my-topic";
    2. admin.nonPersistentTopics().getInternalStats(topic);

    Partitioned topics in Pulsar must be explicitly created. When creating a new partitioned topic you need to provide a name for the topic as well as the desired number of partitions.

    pulsar-admin

    1. $ bin/pulsar-admin non-persistent create-partitioned-topic \
    2. non-persistent://my-tenant/my-namespace/my-topic \
    3. --partitions 4

    REST API

    PUT /admin/v2/non-persistent/:tenant/:namespace/:topic/partitions

    Java

    1. String topicName = "non-persistent://my-tenant/my-namespace/my-topic";
    2. int numPartitions = 4;
    3. admin.nonPersistentTopics().createPartitionedTopic(topicName, numPartitions);

    Partitioned topics have metadata associated with them that you can fetch as a JSON object. The following metadata fields are currently available:

    pulsar-admin

    REST API

    GET /admin/v2/non-persistent/:tenant/:namespace/:topic/partitions

    Java

    1. String topicName = "non-persistent://my-tenant/my-namespace/my-topic";
    2. admin.nonPersistentTopics().getPartitionedTopicMetadata(topicName);

    可以卸载 topic。

    pulsar-admin

    1. $ pulsar-admin non-persistent unload \
    2. non-persistent://test-tenant/ns1/tp1 \

    REST API

    PUT /admin/v2/non-persistent/:tenant/:namespace/:topic/unload

    Java