以下说明和命令中所使用 topic 的名称结构如下:
获取非持久化 topic 的统计数据。
msgRateIn: The sum of all local and replication publishers’ publish rates in messages per second
msgThroughputIn: Same as above, but in bytes per second instead of messages per second
msgRateOut: The sum of all local and replication consumers’ dispatch rates in messages per second
msgThroughputOut: Same as above, but in bytes per second instead of messages per second
averageMsgSize: The average size in bytes of messages published within the last interval
publishers: The list of all local publishers into the topic. There can be zero or thousands
averageMsgSize: Average message size in bytes from this publisher within the last interval
producerId: Internal identifier for this producer on this topic
producerName: Internal identifier for this producer, generated by the client library
address: IP address and source port for the connection of this producer
subscriptions: The list of all local subscriptions to the topic
my-subscription: The name of this subscription (client defined)
consumers: The list of connected consumers for this subscription
consumerName: Internal identifier for this consumer, generated by the client library
availablePermits: The number of messages this consumer has space for in the client library’s listen queue. 值小于 1 意味着客户端库队列已满,不能继续调用 receive()。 非负整数值意味着 consumer 可以随时接收消息。
replication: This section gives the stats for cross-colo replication of this topic
connected: Whether the outbound replicator is connected
inboundConnection: The IP and port of the broker in the remote cluster’s publisher connection to this broker
inboundConnectedSince: The TCP connection being used to publish messages to the remote cluster. 如果没有连接到本地发布者,一分钟后连接将自动关闭。
msgDropRate: for publisher: publish: broker only allows configured number of in flight per connection, and drops all other published messages above the threshold. 若限制不可用或连接不可写入,broker 也会丢弃订阅的消息。
"msgRateIn": 4641.528542257553,
"msgThroughputIn": 44663039.74947473,
"msgRateOut": 0,
"msgThroughputOut": 0,
"averageMsgSize": 1232439.816728665,
"storageSize": 135532389160,
"msgDropRate" : 0.0,
"publishers": [
{
"msgRateIn": 57.855383881403576,
"msgThroughputIn": 558994.7078932219,
"averageMsgSize": 613135,
"producerId": 0,
"producerName": null,
"address": null,
"connectedSince": null,
"msgDropRate" : 0.0
}
],
"subscriptions": {
"my-topic_subscription": {
"msgRateOut": 0,
"msgBacklog": 116632,
"type": null,
"consumers" : [ {
"msgRateOut" : 20343.506296021893,
"msgThroughputOut" : 2.0979855364233278E7,
"msgRateRedeliver" : 0.0,
"consumerName" : "fe3c0",
"availablePermits" : 950,
"unackedMessages" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"address" : "/10.73.210.249:60578",
"connectedSince" : "2017-07-26 15:13:48.026-0700",
"clientVersion" : "1.19-incubating-SNAPSHOT"
} ],
"msgDropRate" : 432.2390921571593
}
},
"replication": {}
}
pulsar-admin
可以使用 stats
命令来获取 topic 的统计信息。
non-persistent://test-tenant/ns1/tp1 \
REST API
Java
String topic = "non-persistent://my-tenant/my-namespace/my-topic";
admin.nonPersistentTopics().getStats(topic);
获取 topic 的详细统计信息。
pulsar-admin
可以使用 stats-internal
命令来获取 topic 的内部统计信息。
REST API
GET /admin/v2/non-persistent/:tenant/:namespace/:topic/internalStats
Java
String topic = "non-persistent://my-tenant/my-namespace/my-topic";
admin.nonPersistentTopics().getInternalStats(topic);
Partitioned topics in Pulsar must be explicitly created. When creating a new partitioned topic you need to provide a name for the topic as well as the desired number of partitions.
pulsar-admin
$ bin/pulsar-admin non-persistent create-partitioned-topic \
non-persistent://my-tenant/my-namespace/my-topic \
--partitions 4
REST API
PUT /admin/v2/non-persistent/:tenant/:namespace/:topic/partitions
Java
String topicName = "non-persistent://my-tenant/my-namespace/my-topic";
int numPartitions = 4;
admin.nonPersistentTopics().createPartitionedTopic(topicName, numPartitions);
Partitioned topics have metadata associated with them that you can fetch as a JSON object. The following metadata fields are currently available:
pulsar-admin
REST API
GET /admin/v2/non-persistent/:tenant/:namespace/:topic/partitions
Java
String topicName = "non-persistent://my-tenant/my-namespace/my-topic";
admin.nonPersistentTopics().getPartitionedTopicMetadata(topicName);
可以卸载 topic。
pulsar-admin
$ pulsar-admin non-persistent unload \
non-persistent://test-tenant/ns1/tp1 \
REST API
PUT /admin/v2/non-persistent/:tenant/:namespace/:topic/unload