下图说明了 Pulsar 在不同集群之间跨地域复制的过程:

In this diagram, whenever P1, P2, and P3 producers publish messages to the T1 topic on Cluster-A, Cluster-B, and Cluster-C clusters respectively, those messages are instantly replicated across clusters. Once the messages are replicated, C1 and C2 consumers can consume those messages from their respective clusters.

Without geo-replication, C1 and C2 consumers are not able to consume messages that P3 producer publishes.

You must enable geo-replication on a per-tenant basis in Pulsar. You can enable geo-replication between clusters only when a tenant is created that allows access to both clusters.

Although geo-replication must be enabled between two clusters, actually geo-replication is managed at the namespace level. You must complete the following tasks to enable geo-replication for a namespace:

  • 配置该名称空间使其可以跨两个或多个集群复制

Any message published on any topic in that namespace is replicated to all clusters in the specified set.

When messages are produced on a Pulsar topic, messages are first persisted in the local cluster, and then forwarded asynchronously to the remote clusters.

In normal cases, when connectivity issues are none, messages are replicated immediately, at the same time as they are dispatched to local consumers. Typically, the network round-trip time (RTT) between the remote regions defines end-to-end delivery latency.

应用程序可以在任何集群中创建生产者和消费者,即使无法访问远程集群(比如在网络分区期间)。

In the aforementioned example, the T1 topic is replicated among three clusters, Cluster-A, Cluster-B, and Cluster-C.

这三个集群中的任何一个集群生成的所有消息都交付给其他集群中的所有订阅。 In this case, C1 and C2 consumers receive all messages that P1, P2, and P3 producers publish. 指令仍然在每个生产者的basis上有所保障。

正如在一节中所述,脉冲星中的Geo-replication是在租户级别进行管理的。

To replicate to a cluster, the tenant needs permission to use that cluster. You can grant permission to the tenant when you create the tenant or grant later.

Specify all the intended clusters when you create a tenant:

要更新现有租户的权限,请使用而不是create

激活geo-replication名称空间

你可以使用如下示例命令创建名称空间。

  1. $ bin/pulsar-admin namespaces create my-tenant/my-namespace

Initially, the namespace is not assigned to any cluster. You can assign the namespace to clusters using the set-clusters subcommand:

You can change the replication clusters for a namespace at any time, without disruption to ongoing traffic. 一旦配置发生更改,复制通道将立即在所有集群中设置或停止。

Once you create a geo-replication namespace, any topics that producers or consumers create within that namespace is replicated across clusters. Typically, each application uses the serviceUrl for the local cluster.

选择性复制

如下是 示例。 Note the use of the setReplicationClusters method when you construct the Message object:

  1. List<String> restrictReplicationTo = Arrays.asList(
  2. "us-west",
  3. "us-east"
  4. Producer producer = client.newProducer()
  5. .topic("some-topic")
  6. .create();
  7. producer.newMessage()
  8. .value("my-payload".getBytes())

Topic 统计数据

特定主题的地跨域复制统计信息可以通过 pulse -admin 工具和 API 查看

每个集群会生成自己的本地统计信息报告,包括传入和传出的复制率和队列容量。

Delete a geo-replication topic

Given that geo-replication topics exist in multiple regions, directly deleting a geo-replication topic is not possible. Instead, you should rely on automatic topic garbage collection.

In Pulsar, a topic is automatically deleted when the topic meets the following three conditions:

  • no producers or consumers are connected to it;
  • no subscriptions to it;
  • no more messages are kept for retention. For geo-replication topics, each region uses a fault-tolerant mechanism to decide when deleting the topic locally is safe.

可以通过在中设置brokerDeleteInactiveTopicsEnabledfalse来显式禁用主题垃圾收集。

要删除跨域复制主题,请关闭该主题上的所有生产者和消费者,并删除每个复制集群中的所有本地订阅。 When Pulsar determines that no valid subscription for the topic remains across the system, it will garbage collect the topic.

Pulsar supports replicated subscriptions, so you can keep subscription state in sync, within a sub-second timeframe, in the context of a topic that is being asynchronously replicated across multiple geographical regions.

In case of failover, a consumer can restart consuming from the failure point in a different cluster.

Enable replicated subscription

Replicated subscription is disabled by default. You can enable replicated subscription when creating a consumer.

  1. Consumer<String> consumer = client.newConsumer(Schema.STRING)
  2. .topic("my-topic")
  3. .subscriptionName("my-subscription")
  4. .replicateSubscriptionState(true)
  5. .subscribe();
  • It is easy to implement the logic.
  • You can choose to enable or disable replicated subscription.
  • When you enable it, the overhead is low, and it is easy to configure.
  • When you disable it, the overhead is zero.

Limitations