Manage topics
非持久主题应用在仅消费实时发布消息与不需要持久化保证的应用程序。 通过这种方式,它通过删除持久消息的开销来减少消息发布延迟。 非持久化topic地址的命名格式如下:
无论是持久化还是非持久化主题,你可以通过pulsar-admin
工具、REST API 、 Java 获取到主题资源。
你可以通过以下方式获得特定命名空间下的 topic 列表。
pulsar-admin
REST API
Java
$ pulsar-admin topics list \ my-tenant/my-namespace
GET /admin/v2/:schema/:tenant/:namespace
String namespace = "my-tenant/my-namespace";admin.topics().getList(namespace);
授权
通过以下方式可以在客户端角色上授权,以便于在指定的主题上执行具体的操作。
pulsar-admin
REST API
Java
$ pulsar-admin topics grant-permission \ --actions produce,consume --role application1 \ persistent://test-tenant/ns1/tp1 \
POST /admin/v2/:schema/:tenant/:namespace/:topic/permissions/:role
String topic = "persistent://my-tenant/my-namespace/my-topic";String role = "test-role";Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce, AuthAction.consume);admin.topics().grantPermission(topic, role, actions);
获取权限
通过以下方式获取权限认证。
pulsar-admin
REST API
Java
$ pulsar-admin topics permissions \ persistent://test-tenant/ns1/tp1 \{ "application1": [ "consume", "produce" ]}
GET /admin/v2/:schema/:tenant/:namespace/:topic/permissions
String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getPermissions(topic);
取消权限
您可以通过以下方式撤销授予客户端角色的权限。
pulsar-admin
REST API
Java
$ pulsar-admin topics revoke-permission \ --role application1 \ persistent://test-tenant/ns1/tp1 \{ "application1": [ "consume", "produce" ]}
DELETE /admin/v2/:schema/:tenant/:namespace/:topic/permissions/:role
String topic = "persistent://my-tenant/my-namespace/my-topic";String role = "test-role";admin.topics().revokePermissions(topic, role);
删除 topic
您可以通过以下方式删除一个主题。 如果某个主题拥有任何活跃的订阅或者生产者,则不能对其删除。
pulsar-admin
REST API
Java
$ pulsar-admin topics delete \ persistent://test-tenant/ns1/tp1 \
DELETE /admin/v2/:schema/:tenant/:namespace/:topic
String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().delete(topic);
卸载 topic
通过以下方式卸载主题。
pulsar-admin
REST API
Java
$ pulsar-admin topics unload \ persistent://test-tenant/ns1/tp1 \
PUT /admin/v2/:schema/:tenant/:namespace/:topic/unload
String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().unload(topic);
获取统计信息
你可以检查给定的未分区主题的以下统计数据。
msgRateIn:所有本地和副本发布者每秒发布消息速率之和(msg/s)。
msgThroughputIn:所有本地和副本发布者每秒发布消息字节数之和(byte/s)。
msgRateOut:所有本地和副本消费者每秒调度消息率之和(msg/s)。
msgThroughputOut:所有本地和副本消费者每秒调度消息字节数之和(byte/s)。
averageMsgSize:在最近时间间隔内发布消息的平均大小(以字节为单位)。
storageSize: The sum of the ledgers’ storage size for this topic. 用于存储该主题消息的空间。
publishers: The list of all local publishers into the topic. 该列表的范围从零到千不等。
msgRateIn:发布者每秒发布消息的总速率(msg/s)。
msgThroughputIn:发布者发布消息的总吞吐量(byte/s)。
averageMsgSize:发布者在最近时间间隔内发布消息的平均大小(以字节为单位)。
producerId:该主题对应的生产者内部识别号。
producerName:由客户端生成的生产者内部识别名称。
address:连接生产者所需的 IP 地址和端口号。
connectedSince:生产者上次创建或者重新连接时的时间戳。
subscriptions:该主题下的所有本地订阅列表。
my-subscription:当前订阅的订阅名称。 通过客户端去定义。
msgRateOut:在此订阅中发送消息的总速率(msg/s)。
msgThroughputOut:在此订阅上发送消息的总吞吐量(byte/s)。
msgBacklog:在此订阅上积压的消息数量。
type:订阅类型。
msgRateExpired:由于 TTL 的原因,消息被丢弃而不是发送到此订阅中的比例。
lastExpireTimestamp:最后一条执行消息过期的时间戳。
lastConsumedFlowTimestamp:收到的最后一条流量指令的时间戳。
lastConsumedTimestamp:消费者所有已消费消息的最新时间戳。
lastAckedTimestamp:消费者所有已被 ack 消息的最新时间戳
consumers:连接到此订阅的消费者列表。
msgRateOut:发送给消费者的消息总速率(msg/s)。
msgThroughputOut: 发送给消费者的总吞吐量(byte/s)。
consumerName:由客户端生成的消费者内部标识符。
availablePermits:消费者在客户端库的监听队列中有空闲的消息数量。
0
意味着客户端库的队列已经满了,receive()
方法不会再接收消息。 非 0 值意味着该消费者可以接收消息。unackedMessages:消费者未确认消息的数量。
blockedConsumerOnUnackedMsgs:验证消费者是否因达到未确认消息数的阀值而被阻塞。
lastConsumedTimestamp:消费者最后一次读取消息的时间戳。
lastAckedTimestamp:消费者最后一次确认消息的时间戳。
replication: This section gives the stats for cross-colo replication of this topic
msgRateIn:从远程集群中收到消息的总速率(msg/s)。
msgThroughputIn:从远程集群中收到消息的总吞吐量(byte/s)。
msgRateOut:发送给副本订阅的消息总速率(msg/s)。
msgThroughputOut:发送给副本订阅的消息总吞吐量(byte/s)。
msgRateExpired:过期消息的总速率(msg/s)。
replicationBacklog:待复制到远程集群的消息数量。
connected:验证外部副本连接器是否已经连接。
replicationDelayInSeconds:如果连接是
true
的话,最早消息的已等待被发送的时长。inboundConnection:远程集群中的 broker 要连接到此 broker 的 IP 和端口。
inboundConnectedSince: The TCP connection being used to publish messages to the remote cluster. 如果没有连接到本地发布者,一分钟后连接将自动关闭。
outboundConnection:外部副本连接的地址。
outboundConnectedSince:建立对外连接时的时间戳。
下面是主题状态的示例:
{
"msgRateIn": 4641.528542257553,
"msgThroughputIn": 44663039.74947473,
"msgRateOut": 0,
"msgThroughputOut": 0,
"averageMsgSize": 1232439.816728665,
"storageSize": 135532389160,
"publishers": [
{
"msgRateIn": 57.855383881403576,
"msgThroughputIn": 558994.7078932219,
"averageMsgSize": 613135,
"producerId": 0,
"producerName": null,
"address": null,
"connectedSince": null
}
],
"subscriptions": {
"my-topic_subscription": {
"msgRateOut": 0,
"msgThroughputOut": 0,
"msgBacklog": 116632,
"type": null,
"msgRateExpired": 36.98245516804671,
"consumers": []
}
},
"replication": {}
}
使用以下方式来获取一个主题的状态:
pulsar-admin
REST API
Java
$ pulsar-admin topics stats \ persistent://test-tenant/ns1/tp1 \
GET /admin/v2/:schema/:tenant/:namespace/:topic/stats
String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getStats(topic);
获取内部统计信息
主题中的详细统计数据如下:
entriesAddedCounter:自此 broker 加载该主题以来发布的消息总量。
numberOfEntries:追踪消息的总数。
totalSize:消息的总存储大小(以字节为单位)。
currentLedgerEntries:当前打开写入操作的 ledger 中,写入的消息总数。
currentLedgerSize:当前打开写入操作的 ledger 中,写入的消息字节大小。
lastLedgerCreatedTimestamp:最后一个 ledger 创建的时间。
lastLedgerCreationFailureTimestamp: 最后一个 ledger 失败的时间。
waitingCursorsCount:等待新消息发布并标记为 “caught up”的游标数量。
pendingAddEntriesCount:完成(异步)写请求的消息数。
lastConfirmedEntry:最后一条成功写入消息的 ledger:entryid。 如果 entryid 为
-1
,即 ledger 是开启状态,但没有写入任何的 entry 。state: The state of this ledger for writing.
LedgerOpened
意味着 ledger 是开启状态,可以保存已发布的消息。ledgers:主题中所保存消息的有序 ledger 列表。
entries:属于该 ledger 的 entry 总数。
size:写入该 ledger 的消息大小(以字节为单位)。
offloaded:该 ledger 是否已卸载。
metadata: 该 ledger 的元数据。
schemaLedgers:该主题模式下所有 ledger 的有序列表。
entries:属于该 ledger 的 entry 总数。
size:写入该 ledger 的消息大小(以字节为单位)。
offloaded:该 ledger 是否已卸载。
metadata: 该 ledger 的元数据。
compactedLedger: The ledgers holding un-acked messages after topic compaction.
ledgerId: 此 ledger 的 ID。
entries:属于该 ledger 的 entry 总数。
size:写入该 ledger 的消息大小(以字节为单位)。
offloaded:该 ledger 是否已卸载。 The value is
false
for the compacted topic ledger.
cursors: The list of all cursors on this topic. Each subscription in the topic stats has a cursor.
markDeletePosition: All messages before the markDeletePosition are acknowledged by the subscriber.
readPosition: The latest position of subscriber for reading message.
waitingReadOp: This is true when the subscription has read the latest message published to the topic and is waiting for new messages to be published.
pendingReadOps: The counter for how many outstanding read requests to the BookKeepers in progress.
messagesConsumedCounter: The number of messages this cursor has acked since this broker loaded this topic.
cursorLedger: The ledger being used to persistently store the current markDeletePosition.
cursorLedgerLastEntry: The last entryid used to persistently store the current markDeletePosition.
individuallyDeletedMessages: If acknowledges are being done out of order, the ranges of messages acknowledged between the markDeletePosition and the read-position shows.
lastLedgerSwitchTimestamp: The last time the cursor ledger is rolled over.
state: The state of the cursor ledger:
Open
means you have a cursor ledger for saving updates of the markDeletePosition.
下面是关于一个主题的详细统计示例。
{
"entriesAddedCounter":0,
"numberOfEntries":0,
"totalSize":0,
"currentLedgerEntries":0,
"currentLedgerSize":0,
"lastLedgerCreatedTimestamp":"2021-01-22T21:12:14.868+08:00",
"lastLedgerCreationFailureTimestamp":null,
"waitingCursorsCount":0,
"pendingAddEntriesCount":0,
"lastConfirmedEntry":"3:-1",
"state":"LedgerOpened",
"ledgers":[
{
"ledgerId":3,
"entries":0,
"size":0,
"offloaded":false,
"metadata":null
}
],
"cursors":{
"test":{
"markDeletePosition":"3:-1",
"readPosition":"3:-1",
"waitingReadOp":false,
"pendingReadOps":0,
"messagesConsumedCounter":0,
"cursorLedger":4,
"cursorLedgerLastEntry":1,
"individuallyDeletedMessages":"[]",
"lastLedgerSwitchTimestamp":"2021-01-22T21:12:14.966+08:00",
"state":"Open",
"numberOfEntriesSinceFirstNotAckedMessage":0,
"totalNonContiguousDeletedMessagesRange":0,
"properties":{
}
}
},
"schemaLedgers":[
{
"ledgerId":1,
"entries":11,
"size":10,
"offloaded":false,
"metadata":null
}
],
"compactedLedger":{
"ledgerId":-1,
"entries":-1,
"size":-1,
"offloaded":false,
"metadata":null
}
}
可以使用以下方式来获取一个主题的内部状态。
pulsar-admin
REST API
Java
$ pulsar-admin topics stats-internal \ persistent://test-tenant/ns1/tp1 \
GET /admin/v2/:schema/:tenant/:namespace/:topic/internalStats
String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getInternalStats(topic);
查看消息
可以使用以下方式为某一主题的特定订阅提供一些信息。
pulsar-admin
REST API
Java
$ pulsar-admin topics peek-messages \ --count 10 --subscription my-subscription \ persistent://test-tenant/ns1/tp1 \Message ID: 315674752:0Properties: { "X-Pulsar-publish-time" : "2015-07-13 17:40:28.451" }msg-payload
GET /admin/v2/:schema/:tenant/:namespace/:topic/subscription/:subName/position/:messagePosition
Get message by ID
可以使用以下方式获取给定 ledger ID 和 entry ID 的信息。
pulsar-admin
REST API
Java
$ ./bin/pulsar-admin topics get-message-by-id \ persistent://public/default/my-topic \ -l 10 -e 0
GET /admin/v2/:schema/:tenant/:namespace/:topic/ledger/:ledgerId/entry/:entryId
String topic = "persistent://my-tenant/my-namespace/my-topic";long ledgerId = 10;long entryId = 10;admin.topics().getMessageById(topic, ledgerId, entryId);
跳过消息
可以使用以下方式跳过某一主题的特定订阅的一些信息。
pulsar-admin
REST API
Java
$ pulsar-admin topics skip \ --count 10 --subscription my-subscription \ persistent://test-tenant/ns1/tp1 \
POST /admin/v2/:schema/:tenant/:namespace/:topic/subscription/:subName/skip/:numMessages
String topic = "persistent://my-tenant/my-namespace/my-topic";String subName = "my-subscription";int numMessages = 1;admin.topics().skipMessages(topic, subName, numMessages);
跳过所有消息
跳过某一主题的特定订阅的所有旧消息。
pulsar-admin
REST API
Java
$ pulsar-admin topics skip-all \ --subscription my-subscription \ persistent://test-tenant/ns1/tp1 \
POST /admin/v2/:schema/:tenant/:namespace/:topic/subscription/:subName/skip_all
String topic = "persistent://my-tenant/my-namespace/my-topic";String subName = "my-subscription";admin.topics().skipAllMessages(topic, subName);
可以将一个订阅的游标位置重新设置为 X 分钟前记录的位置。 实际上通过计算时间和X分钟之前的cursor位置,来重置回到那个位置。 你可以用下面方式重置 cursor。
pulsar-admin
REST API
Java
$ pulsar-admin topics reset-cursor \ --subscription my-subscription --time 10 \ persistent://test-tenant/ns1/tp1 \
String topic = "persistent://my-tenant/my-namespace/my-topic";String subName = "my-subscription";long timestamp = 2342343L;admin.topics().skipAllMessages(topic, subName, timestamp);
查询topic
可以通过以下方式找到服务于特定主题的 broker URL。
pulsar-admin
REST API
Java
$ pulsar-admin topics lookup \ persistent://test-tenant/ns1/tp1 \ "pulsar://broker1.org.com:4480"
String topic = "persistent://my-tenant/my-namespace/my-topic";admin.lookup().lookupDestination(topic);
获取bundle
可以通过以下方式检查包含给定主题的 bundle 范围。
pulsar-admin
REST API
Java
$ pulsar-admin topics bundle-range \ persistent://test-tenant/ns1/tp1 \ "0x00000000_0xffffffff"
String topic = "persistent://my-tenant/my-namespace/my-topic";admin.lookup().getBundleRange(topic);
获取订阅
可以通过以下方式查看某个主题的所有订阅名称。
pulsar-admin
REST API
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getSubscriptions(topic);
最后一条消息Id
可以获得一个持久化主题最后提交的消息 ID 。 2.3.0 往后版本都可用。
pulsar-admin
REST API
Java
pulsar-admin topics last-message-id topic-name
String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getLastMessage(topic);
配置去重快照间隔
获取去重快照间隔
获取主题级去重快照间隔可以使用以下方法之一。
Pulsar-admin API
REST API
Java API
pulsar-admin topics get-deduplication-snapshot-interval options
{@inject: endpoint|GET|/admin/v2/topics/:tenant/:namespace/:topic/deduplicationSnapshotInterval?version=[[pulsar:version_number]]}
admin.topics().getDeduplicationSnapshotInterval(topic)
设置去重快照间隔
设置主题级去重快照间隔,可以使用以下方法之一。
Pulsar-admin API
REST API
Java API
pulsar-admin topics set-deduplication-snapshot-interval options
{@inject: endpoint|POST|/admin/v2/topics/:tenant/:namespace/:topic/deduplicationSnapshotInterval?version=[[pulsar:version_number]]}
{ "interval": 1000}
admin.topics().setDeduplicationSnapshotInterval(topic, 1000)
移除去重快照间隔
移除主题级去重快照间隔,可以使用以下方法之一。
Pulsar-admin API
REST API
Java API
pulsar-admin topics remove-deduplication-snapshot-interval options
{@inject: endpoint|DELETE|/admin/v2/topics/:tenant/:namespace/:topic/deduplicationSnapshotInterval?version=[[pulsar:version_number]]}
admin.topics().removeDeduplicationSnapshotInterval(topic)
配置非活跃主题策略
获取非活跃主题策略
获取主题级非活跃主题策略,可以使用以下方法之一。
Pulsar-admin API
REST API
Java API
pulsar-admin topics get-inactive-topic-policies options
{@inject: endpoint|GET|/admin/v2/topics/:tenant/:namespace/:topic/inactiveTopicPolicies?version=[[pulsar:version_number]]}
admin.topics().getInactiveTopicPolicies(topic)
设置非活跃主题策略
设置主题级非活跃主题策略,可以使用以下方法之一。
Pulsar-admin API
REST API
Java API
pulsar-admin topics set-inactive-topic-policies options
{@inject: endpoint|POST|/admin/v2/topics/:tenant/:namespace/:topic/inactiveTopicPolicies?version=[[pulsar:version_number]]}
admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies)
移除非活跃主题策略
移除主题级非活跃主题策略,可以使用以下方法之一。
Pulsar-admin API
REST API
Java API
pulsar-admin topics remove-inactive-topic-policies options
{@inject: endpoint|DELETE|/admin/v2/topics/:tenant/:namespace/:topic/inactiveTopicPolicies?version=[[pulsar:version_number]]}
admin.topics().removeInactiveTopicPolicies(topic)
配置卸载策略
获取卸载策略
获取主题级卸载策略,可以使用以下方法之一。
Pulsar-admin API
REST API
Java API
pulsar-admin topics get-offload-policies options
{@inject: endpoint|GET|/admin/v2/topics/:tenant/:namespace/:topic/offloadPolicies?version=[[pulsar:version_number]]}
admin.topics().getOffloadPolicies(topic)
设置卸载策略
设置主题级卸载策略,可以使用以下方法之一。
Pulsar-admin API
REST API
Java API
pulsar-admin topics set-offload-policies options
{@inject: endpoint|POST|/admin/v2/topics/:tenant/:namespace/:topic/offloadPolicies?version=[[pulsar:version_number]]}
admin.topics().setOffloadPolicies(topic, offloadPolicies)
移除卸载策略
移除主题级卸载策略,可以使用以下方法之一。
Pulsar-admin API
Java API
pulsar-admin topics remove-offload-policies options
{@inject: endpoint|DELETE|/admin/v2/topics/:tenant/:namespace/:topic/offloadPolicies?version=[[pulsar:version_number]]}
admin.topics().removeOffloadPolicies(topic)
可以使用 Pulsar admin API 来创建、删除和检查非分区主题的状态。
创建
必须明确创建非分区主题。 当创建一个新的非分区主题时,需要为该主题提供一个名称。
默认情况下,创建 60 秒后,主题会被视为不活跃,并自动删除,以避免生成垃圾数据。 To disable this feature, set brokerDeleteInactiveTopicsEnabled
to false
. 设置 brokerDeleteInactiveTopicsFrequencySeconds
为特殊值以改变检查非活动主题的频率。
关于这两个参数的更多信息,请参阅 这里。
可以通过以下方式创建非分区主题。
pulsar-admin
REST API
Java
When you create non-partitioned topics with the command, you need to specify the topic name as an argument.
$ bin/pulsar-admin topics create \ persistent://my-tenant/my-namespace/my-topic
PUT /admin/v2/:schema/:tenant/:namespace/:topic
String topicName = "persistent://my-tenant/my-namespace/my-topic";admin.topics().createNonPartitionedTopic(topicName);
删除
可以通过以下方式来删除非分区主题。
pulsar-admin
REST API
Java
DELETE /admin/v2/:schema/:tenant/:namespace/:topic
admin.topics().delete(topic);
获取资源列表
你可以通过以下方式获得特定命名空间下的 topic 列表。
pulsar-admin
REST API
Java
$ pulsar-admin topics list tenant/namespacepersistent://tenant/namespace/topic1persistent://tenant/namespace/topic2
GET /admin/v2/:schema/:tenant/:namespace
admin.topics().getList(namespace);
统计信息
检查某个主题的当前统计数据。 The following is an example. 关于每个统计数据的描述,参见 get stats。
{
"msgRateIn": 4641.528542257553,
"msgThroughputIn": 44663039.74947473,
"msgRateOut": 0,
"msgThroughputOut": 0,
"averageMsgSize": 1232439.816728665,
"storageSize": 135532389160,
"publishers": [
{
"msgRateIn": 57.855383881403576,
"msgThroughputIn": 558994.7078932219,
"averageMsgSize": 613135,
"producerId": 0,
"producerName": null,
"address": null,
"connectedSince": null
}
],
"subscriptions": {
"msgRateOut": 0,
"msgThroughputOut": 0,
"msgBacklog": 116632,
"type": null,
"msgRateExpired": 36.98245516804671,
"consumers": []
}
},
"replication": {}
}
可以通过以下方式检查某个主题及其相关生产者和消费者的当前统计数据。
pulsar-admin
REST API
Java
$ pulsar-admin topics stats \ persistent://test-tenant/namespace/topic \ --get-precise-backlog
admin.topics().getStats(topic, false /* is precise backlog */);
可以使用 Pulsar admin API 来创建、更新、删除和检查分区主题的状态。
必须明确创建分区主题。 当创建一个新的分区主题时,需要为该主题提供一个名称和分区数量。
默认情况下,创建 60 秒后,主题会被视为不活跃,并自动删除,以避免生成垃圾数据。 To disable this feature, set brokerDeleteInactiveTopicsEnabled
to false
. 设置 brokerDeleteInactiveTopicsFrequencySeconds
为特殊值以改变检查非活动主题的频率。
关于这两个参数的更多信息,请参阅 。
可以通过以下方式创建分区主题。
pulsar-admin
REST API
Java
When you create partitioned topics with the create-partitioned-topic command, you need to specify the topic name as an argument and the number of partitions using the -p
or --partitions
flag.
$ bin/pulsar-admin topics create-partitioned-topic \ persistent://my-tenant/my-namespace/my-topic \ --partitions 4
String topicName = "persistent://my-tenant/my-namespace/my-topic";int numPartitions = 4;admin.topics().createPartitionedTopic(topicName, numPartitions);
Create missed partitions
在主题的 auto-creation 是禁用状态并且有一个没有任何分区的主题时,可以使用 命令为主题创建分区。
pulsar-admin
REST API
Java
可以用 create-missed-partitions 命令指定主题名称作为参数来创建 miss 分区。
$ bin/pulsar-admin topics create-missed-partitions \ persistent://my-tenant/my-namespace/my-topic \
String topicName = "persistent://my-tenant/my-namespace/my-topic";admin.topics().createMissedPartitions(topicName);
获取元数据
已分区的主题与元数据相关联,可以将其看作一个 JSON 对象。 以下元数据字段是可用的。
pulsar-admin
REST API
Java
可以通过 子命令检查分区主题的分区数量。
$ pulsar-admin topics get-partitioned-topic-metadata \ persistent://my-tenant/my-namespace/my-topic{ "partitions": 4}
GET /admin/v2/:schema/:tenant/:namespace/:topic/partitions
String topicName = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getPartitionedTopicMetadata(topicName);
更新
如果 主题是非全局的,你可以更新现有已分区主题的分区数量。 然而,你只能添加分区号。 减少分区的数量就会删除对应主题,在 Pulsar 中是不支持的。
生产者和消费者可以自动找到新创建的分区。
pulsar-admin
REST API
Java
可以使用 update-partitioned-topic 命令更新分区主题。
$ pulsar-admin topics update-partitioned-topic \ persistent://my-tenant/my-namespace/my-topic \ --partitions 8
admin.topics().updatePartitionedTopic(topic, numPartitions);
删除
可以使用 命令、REST API 或者 Java 删除分区主题。
pulsar-admin
REST API
Java
$ bin/pulsar-admin topics delete-partitioned-topic \ persistent://my-tenant/my-namespace/my-topic
DELETE /admin/v2/:schema/:topic/:namespace/:destination/partitions
admin.topics().delete(topic);
获取资源列表
你可以通过以下方式获取给定命名空间下的分区主题列表。
pulsar-admin
REST API
Java
$ pulsar-admin topics list-partitioned-topics tenant/namespacepersistent://tenant/namespace/topic1persistent://tenant/namespace/topic2
GET /admin/v2/:schema/:tenant/:namespace
admin.topics().getPartitionedTopicList(namespace);
统计信息
可以查看某个主题的当前统计数据。 The following is an example. 关于每个统计数据的描述,参见 get stats。
请注意,在订阅的 JSON 对象中, chuckedMessageRate
已被废弃。 请使用 chunkedMessageRate
。 两者目前都将发送到 JSON 中。
{
"msgRateIn" : 999.992947159793,
"msgThroughputIn" : 1070918.4635439808,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 270318763,
"msgInCounter" : 252489,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"averageMsgSize" : 1070.926056966454,
"msgChunkPublished" : false,
"storageSize" : 270316646,
"backlogSize" : 200921133,
"publishers" : [ {
"msgThroughputIn" : 1070918.4635439808,
"averageMsgSize" : 1070.3333333333333,
"chunkedMessageRate" : 0.0,
"producerId" : 0
} ],
"subscriptions" : {
"test" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"chuckedMessageRate" : 0,
"chunkedMessageRate" : 0,
"msgBacklog" : 144318,
"msgBacklogNoDelayed" : 144318,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"msgRateExpired" : 0.0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"lastAckedTimestamp" : 0,
"consumers" : [ ],
"isDurable" : true,
"isReplicated" : false
}
},
"replication" : { },
"metadata" : {
"partitions" : 3
},
"partitions" : { }
}
你可以通过以下方式检查给定分区主题及其当前连接的生产者和消费者的统计信息。
pulsar-admin
REST API
Java
$ pulsar-admin topics partitioned-stats \ persistent://test-tenant/namespace/topic \ --per-partition
admin.topics().getPartitionedStats(topic, true /* per partition */, false /* is precise backlog */);
Internal stats
你可以检查主题的详细统计数据。 The following is an example. 有关每个统计信息的说明,详见。
{
"entriesAddedCounter": 20449518,
"numberOfEntries": 3233,
"totalSize": 331482,
"currentLedgerEntries": 3233,
"currentLedgerSize": 331482,
"lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
"lastLedgerCreationFailureTimestamp": null,
"waitingCursorsCount": 1,
"pendingAddEntriesCount": 0,
"lastConfirmedEntry": "324711539:3232",
"state": "LedgerOpened",
"ledgers": [
{
"ledgerId": 324711539,
"entries": 0,
"size": 0
}
],
"cursors": {
"my-subscription": {
"markDeletePosition": "324711539:3133",
"readPosition": "324711539:3233",
"waitingReadOp": true,
"pendingReadOps": 0,
"messagesConsumedCounter": 20449501,
"cursorLedger": 324702104,
"cursorLedgerLastEntry": 21,
"individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
"lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
"state": "Open"
}
}
}
你可以通过以下方式获取分区主题的内部统计信息。
pulsar-admin
REST API
Java
$ pulsar-admin topics stats-internal \ persistent://test-tenant/namespace/topic
GET /admin/v2/:schema/:tenant/:namespace/:topic/internalStats
admin.topics().getInternalStats(topic);
默认情况下,Pulsar 主题由单个 broker 提供服务,这限制了主题的最大吞吐量。 分区主题可以跨越多个 broker,从而实现更高的吞吐量。
你可以使用 Pulsar 客户端库发布到分区主题。 发布到分区主题时,必须指定路由模式。 如果在创建新的生产者时没有指定任何路由方式,则使用轮询路由模式。
Routing mode
You can specify the routing mode in the ProducerConfiguration object that you use to configure your producer. 路由模式决定了每条消息应该发往哪个分区(内部主题)。
以下 MessageRoutingMode 选项是可用的。
发送模式 | 说明 |
---|---|
RoundRobinPartition | 如果没有提供 key,生产者将在所有分区中以轮训策略进行发布消息,以达到最大的吞吐量。 请注意轮训并不是作用于每条单独的消息,而是作用于延迟处理的批次边界,以确保批处理有效。 如果在消息上指定了 key ,分区生产者会根据 key 的 hash 值将消息分配给对应的分区。 这是默认的模式。 |
SinglePartition | 如果消息没有指定 key,生产者会随机挑选一个分区,并发布所有消息到该分区。 如果在消息上指定了 key ,分区生产者会根据 key 的 hash 值将消息分配给对应的分区。 |
CustomPartition | 使用自定义消息路由器实现来决定特定消息的分区。 你可以通过使用 Java 客户端和实现 接口来创建一个自定义路由模式。 |
如下所示:
String pulsarBrokerRootUrl = "pulsar://localhost:6650";
String topic = "persistent://my-tenant/my-namespace/my-topic";
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.send("Partitioned topic message".getBytes());
Custom message router
要使用自定义消息路由器,您需要提供 接口的实现,该接口只有一个choosePartition
方法:
public interface MessageRouter extends Serializable {
int choosePartition(Message msg);
}
下面的路由模式表示所有的消息都会被发送到分区10:
public class AlwaysTenRouter implements MessageRouter {
public int choosePartition(Message msg) {
return 10;
}
}
通过该实现,你可以发送:
String pulsarBrokerRootUrl = "pulsar://localhost:6650";
String topic = "persistent://my-tenant/my-cluster-my-namespace/my-topic";
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.messageRouter(new AlwaysTenRouter())
.create();
producer.send("Partitioned topic message".getBytes());
使用 Key 时如何选择分区
If a message has a key, it supersedes the round robin routing policy. 以下示例说明当使用key时如何选择分区。
// 如果消息存在key,轮询路由策略将被替换
if (msg.hasKey()) {
return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions());
}
if (isBatchingEnabled) { // 如果开启批处理,请在 `partitionSwitchMs` 边界上选择分区。
long currentMs = clock.millis();
return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, topicMetadata.numPartiations());
other.
return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), topicMetadata.numPartiations());
}
你可以使用 创建、检查和删除订阅。
创建订阅
You can create a subscription for a topic using one of the following methods.
pulsar-admin
REST API
Java
pulsar-admin topics create-subscription \--subscription my-subscription \persistent://test-tenant/ns1/tp1
String topic = "persistent://my-tenant/my-namespace/my-topic";String subscriptionName = "my-subscription";admin.topics().createSubscription(topic, subscriptionName, MessageId.latest);
### 获取订阅 You can check all subscription names for a given topic using one of the following methods.
pulsar-admin
REST API
Java
pulsar-admin topics subscriptions \persistent://test-tenant/ns1/tp1 \my-subscription
GET /admin/v2/:schema/:tenant/:namespace/:topic/subscriptions
String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getSubscriptions(topic);
### 取消订阅 When a subscription does not process messages any more, you can unsubscribe it using one of the following methods.
pulsar-admin
REST API
Java
String topic = "persistent://my-tenant/my-namespace/my-topic";String subscriptionName = "my-subscription";admin.topics().deleteSubscription(topic, subscriptionName);