A Pulsar instance consists of multiple Pulsar clusters working in unison. You can distribute clusters across data centers or geographical regions and replicate the clusters amongst themselves using geo-replication. Deploying a multi-cluster Pulsar instance involves the following basic steps:

  • Deploying two separate quorums: a local quorum for each cluster in the instance and a quorum for instance-wide tasks
  • Initializing cluster metadata for each cluster
  • Deploying a of bookies in each Pulsar cluster
  • Deploying brokers in each Pulsar cluster

If you want to deploy a single Pulsar cluster, see .

Pulsar is currently available for MacOS and Linux. In order to use Pulsar, you need to install Java 8 from Oracle download center.

Install Pulsar

开始运行Pulsar之前,请先用下列几种方式下载二进制包:

Once you download the tarball, untar it and cd into the resulting directory:

  1. $ tar xvfz apache-pulsar-2.6.1-bin.tar.gz
  2. $ cd apache-pulsar-2.6.1

软件包包含的内容:

Pulsar 二进制包包含下列目录:

The following directories are created once you begin running Pulsar:

目录内容
dataThe data storage directory that ZooKeeper and BookKeeper use
instances为创建的组件
logsLogs that the installation creates

每个 Pulsar 实例需要两个独立的 ZooKeeper 集群。

  • Local ZooKeeper operates at the cluster level and provides cluster-specific configuration management and coordination. 每个 Pulsar 集群都需要一个专用的 ZooKeeper 集群。
  • operates at the instance level and provides configuration management for the entire system (and thus across clusters). An independent cluster of machines or the same machines that local ZooKeeper uses can provide the configuration store quorum.

The configuration store quorum can be provided by an independent cluster of machines or by the same machines used by local ZooKeeper.

Deploy local ZooKeeper

ZooKeeper manages a variety of essential coordination-related and configuration-related tasks for Pulsar.

You need to stand up one local ZooKeeper cluster per Pulsar cluster for deploying a Pulsar instance.

首先,将所有 ZooKeeper 服务器添加到 指定的 quorum 配置中。 在配置文件中为每个节点添加一个 server.N 行,其中 N 是 ZooKeeper 节点的编号。 The following is an example for a three-node cluster:

  1. server.2=zk2.us-west.example.com:2888:3888
  2. server.3=zk3.us-west.example.com:2888:3888

On each host, you need to specify the ID of the node in the myid file of each node, which is in data/zookeeper folder of each server by default (you can change the file location via the dataDir parameter).

On a ZooKeeper server at zk1.us-west.example.com, for example, you could set the myid value like this:

  1. $ mkdir -p data/zookeeper
  2. $ echo 1 > data/zookeeper/myid

On zk2.us-west.example.com the command looks like echo 2 > data/zookeeper/myid and so on.

Once you add each server to the zookeeper.conf configuration and each server has the appropriate myid entry, you can start ZooKeeper on all hosts (in the background, using nohup) with the CLI tool:

  1. $ bin/pulsar-daemon start zookeeper

Deploy the configuration store

The ZooKeeper cluster that is configured and started up in the section above is a local ZooKeeper cluster that you can use to manage a single Pulsar cluster. 但是,除了本地集群之外,一个完整的 Pulsar 实例还需要 configuration store来处理一些实例级配置和协调任务。

If you deploy a instance, you do not need a separate cluster for the configuration store. If, however, you deploy a multi-cluster instance, you should stand up a separate ZooKeeper cluster for configuration tasks.

单集群 Pulsar 实例

To deploy a ZooKeeper configuration store in a single-cluster instance, add the same ZooKeeper servers that the local quorom uses to the configuration file in conf/global_zookeeper.conf using the same method for , but make sure to use a different port (2181 is the default for ZooKeeper). The following is an example that uses port 2184 for a three-node ZooKeeper cluster:

  1. clientPort=2184
  2. server.1=zk1.us-west.example.com:2185:2186
  3. server.2=zk2.us-west.example.com:2185:2186
  4. server.3=zk3.us-west.example.com:2185:2186

As before, create the myid files for each server on data/global-zookeeper/myid.

Multi-cluster Pulsar instance

When you deploy a global Pulsar instance, with clusters distributed across different geographical regions, the configuration store serves as a highly available and strongly consistent metadata store that can tolerate failures and partitions spanning whole regions.

The key here is to make sure the ZK quorum members are spread across at least 3 regions and that other regions run as observers.

Again, given the very low expected load on the configuration store servers, you can share the same hosts used for the local ZooKeeper quorum.

For example, assume a Pulsar instance with the following clusters us-west, , us-central, eu-central, ap-south. Also assume, each cluster has its own local ZK servers named such as the following:

  1. zk[1-3].${CLUSTER}.example.com

In this scenario if you want to pick the quorum participants from few clusters and let all the others be ZK observers. For example, to form a 7 servers quorum, you can pick 3 servers from us-west, 2 from us-central and 2 from us-east.

This method guarantees that writes to configuration store is possible even if one of these regions is unreachable.

The ZK configuration in all the servers looks like:

  1. clientPort=2184
  2. server.1=zk1.us-west.example.com:2185:2186
  3. server.2=zk2.us-west.example.com:2185:2186
  4. server.3=zk3.us-west.example.com:2185:2186
  5. server.4=zk1.us-central.example.com:2185:2186
  6. server.5=zk2.us-central.example.com:2185:2186
  7. server.6=zk3.us-central.example.com:2185:2186:observer
  8. server.7=zk1.us-east.example.com:2185:2186
  9. server.8=zk2.us-east.example.com:2185:2186
  10. server.9=zk3.us-east.example.com:2185:2186:observer
  11. server.10=zk1.eu-central.example.com:2185:2186:observer
  12. server.11=zk2.eu-central.example.com:2185:2186:observer
  13. server.12=zk3.eu-central.example.com:2185:2186:observer
  14. server.13=zk1.ap-south.example.com:2185:2186:observer
  15. server.14=zk2.ap-south.example.com:2185:2186:observer
  16. server.15=zk3.ap-south.example.com:2185:2186:observer

Additionally, ZK observers need to have the following parameters:

Start the service

Once your configuration store configuration is in place, you can start up the service using pulsar-daemon

  1. $ bin/pulsar-daemon start configuration-store

Cluster metadata initialization

Once you set up the cluster-specific ZooKeeper and configuration store quorums for your instance, you need to write some metadata to ZooKeeper for each cluster in your instance. you only needs to write these metadata once.

You can initialize this metadata using the initialize-cluster-metadata command of the CLI tool. The following is an example:

  1. $ bin/pulsar initialize-cluster-metadata \
  2. --cluster us-west \
  3. --zookeeper zk1.us-west.example.com:2181 \
  4. --configuration-store zk1.us-west.example.com:2184 \
  5. --web-service-url http://pulsar.us-west.example.com:8080/ \
  6. --web-service-url-tls https://pulsar.us-west.example.com:8443/ \
  7. --broker-service-url pulsar://pulsar.us-west.example.com:6650/ \
  8. --broker-service-url-tls pulsar+ssl://pulsar.us-west.example.com:6651/

As you can see from the example above, you need to specify the following:

  • 集群名称
  • 集群与本地 ZooKeeper 连接的字符串
  • The configuration store connection string for the entire instance
  • 集群 web 服务的 URL
  • Broker 服务的 URL,用于启动集群中 broker 之间的交互

If you use , you also need to specify a TLS web service URL for the cluster as well as a TLS broker service URL for the brokers in the cluster.

Make sure to run initialize-cluster-metadata for each cluster in your instance.

Deploy BookKeeper

BookKeeper provides for Pulsar.

Each Pulsar broker needs to have its own cluster of bookies. The BookKeeper cluster shares a local ZooKeeper quorum with the Pulsar cluster.

You can configure BookKeeper bookies using the conf/bookkeeper.conf configuration file. The most important aspect of configuring each bookie is ensuring that the parameter is set to the connection string for the local ZooKeeper of Pulsar cluster.

Start bookies

You can start a bookie in two ways: in the foreground or as a background daemon.

To start a bookie in the background, use the CLI tool:

    You can verify that the bookie works properly using the bookiesanity command for the BookKeeper shell:

    1. $ bin/bookkeeper shell bookiesanity

    This command creates a new ledger on the local bookie, writes a few entries, reads them back and finally deletes the ledger.

    After you have started all bookies, you can use the simpletest command for on any bookie node, to verify that all bookies in the cluster are running.

    1. $ bin/bookkeeper shell simpletest --ensemble <num-bookies> --writeQuorum <num-bookies> --ackQuorum <num-bookies> --numEntries <num-entries>

    Bookie hosts are responsible for storing message data on disk. In order for bookies to provide optimal performance, having a suitable hardware configuration is essential for the bookies. The following are key dimensions for bookie hardware capacity.

    • Disk I/O capacity read/write
    • Storage capacity

    Message entries written to bookies are always synced to disk before returning an acknowledgement to the Pulsar broker. To ensure low write latency, BookKeeper is designed to use multiple devices:

    • A journal to ensure durability. For sequential writes, having fast fsync operations on bookie hosts is critical. Typically, small and fast (SSDs) should suffice, or hard disk drives (HDDs) with a s controller and a battery-backed write cache. Both solutions can reach fsync latency of ~0.4 ms.
    • A ledger storage device is where data is stored until all consumers acknowledge the message. Writes happen in the background, so write I/O is not a big concern. Reads happen sequentially most of the time and the backlog is drained only in case of consumer drain. To store large amounts of data, a typical configuration involves multiple HDDs with a RAID controller.

    Broker configuration

    You can configure brokers using the configuration file.

    The most important element of broker configuration is ensuring that each broker is aware of its local ZooKeeper quorum as well as the configuration store quorum. Make sure that you set the zookeeperServers parameter to reflect the local quorum and the parameter to reflect the configuration store quorum (although you need to specify only those ZooKeeper servers located in the same cluster).

    You also need to specify the name of the cluster to which the broker belongs using the parameter. In addition, you need to match the broker and web service ports provided when you initialize the metadata (especially when you use a different port from default) of the cluster.

    The following is an example configuration:

    1. # Local ZooKeeper servers
    2. zookeeperServers=zk1.us-west.example.com:2181,zk2.us-west.example.com:2181,zk3.us-west.example.com:2181
    3. # Configuration store quorum connection string.
    4. configurationStoreServers=zk1.us-west.example.com:2184,zk2.us-west.example.com:2184,zk3.us-west.example.com:2184
    5. clusterName=us-west
    6. # Broker data port
    7. brokerServicePort=6650
    8. # Broker data port for TLS
    9. brokerServicePortTls=6651
    10. # Port to use to server HTTP request
    11. webServicePort=8080
    12. # Port to use to server HTTPS request
    13. webServicePortTls=8443

    Pulsar brokers do not require any special hardware since they do not use the local disk. You had better choose fast CPUs and 10Gbps NIC so that the software can take full advantage of that.

    Start the broker service

    You can start a broker in the background by using nohup with the CLI tool:

    1. $ bin/pulsar-daemon start broker

    You can also start brokers in the foreground by using pulsar broker:

    Service discovery

    Clients connecting to Pulsar brokers need to be able to communicate with an entire Pulsar instance using a single URL. Pulsar provides a built-in service discovery mechanism that you can set up using the instructions .

    You can also use your own service discovery system if you want. If you use your own system, you only need to satisfy just one requirement: when a client performs an HTTP request to an endpoint for a Pulsar cluster, such as http://pulsar.us-west.example.com:8080, the client needs to be redirected to some active broker in the desired cluster, whether via DNS, an HTTP or IP redirect, or some other means.

    Service discovery setup

    The service discovery mechanism that included with Pulsar maintains a list of active brokers, which stored in ZooKeeper, and supports lookup using HTTP and also the binary protocol of Pulsar.

    To get started setting up the built-in service of discovery of Pulsar, you need to change a few parameters in the configuration file. Set the zookeeperServers parameter to the ZooKeeper quorum connection string of the cluster and the setting to the configuration store quorum connection string.

    1. # Zookeeper quorum connection string
    2. zookeeperServers=zk1.us-west.example.com:2181,zk2.us-west.example.com:2181,zk3.us-west.example.com:2181
    3. # Global configuration store connection string
    4. configurationStoreServers=zk1.us-west.example.com:2184,zk2.us-west.example.com:2184,zk3.us-west.example.com:2184

    To start the discovery service:

    1. $ bin/pulsar-daemon start discovery

    Admin client and verification

    At this point your Pulsar instance should be ready to use. You can now configure client machines that can serve as administrative clients for each cluster. You can use the configuration file to configure admin clients.

    The most important thing is that you point the serviceUrl parameter to the correct service URL for the cluster:

    1. serviceUrl=http://pulsar.us-west.example.com:8080/

    Pulsar is built as a fundamentally multi-tenant system.

    If a new tenant wants to use the system, you need to create a new one. You can create a new tenant by using the CLI tool:

    1. $ bin/pulsar-admin tenants create test-tenant \
    2. --allowed-clusters us-west \
    3. --admin-roles test-admin-role

    In this command, users who identify with test-admin-role role can administer the configuration for the test-tenant tenant. The test-tenant tenant can only use the us-west cluster. From now on, this tenant can manage its resources.

    Once you create a tenant, you need to create namespaces for topics within that tenant.

    The first step is to create a namespace. A namespace is an administrative unit that can contain many topics. A common practice is to create a namespace for each different use case from a single tenant.

    1. $ bin/pulsar-admin namespaces create test-tenant/ns1
    Test producer and consumer

    Everything is now ready to send and receive messages. The quickest way to test the system is through the pulsar-perf client tool.

    You can use a topic in the namespace that you have just created. Topics are automatically created the first time when a producer or a consumer tries to use them.

    The topic name in this case could be:

    1. persistent://test-tenant/ns1/my-topic

    Start a consumer that creates a subscription on the topic and waits for messages:

    1. $ bin/pulsar-perf consume persistent://test-tenant/ns1/my-topic

    Start a producer that publishes messages at a fixed rate and reports stats every 10 seconds:

    To report the topic stats: