Pulsar C++ client

    C++ 客户端中生产者、消费者和 Reader 的所有方法都是线程安全的。

    Pulsar C++ client is supported on Linux ,MacOS and Windows platforms.

    关于 Doxygen 生成的适用于 C++ 客户端的 API 文档,参阅。

    系统要求

    您需要先安装下列组件才能使用 C++ 客户端:

    Linux

    1. 克隆Pulsar项目仓库
    1. 安装所有必要的依赖项。
    1. $ apt-get install cmake libssl-dev libcurl4-openssl-dev liblog4cxx-dev \
    2. libprotobuf-dev protobuf-compiler libboost-all-dev google-mock libgtest-dev libjsoncpp-dev
    1. 编译并安装 。
    1. # libgtest-dev version is 1.18.0 or above
    2. $ cd /usr/src/googletest
    3. $ sudo cmake .
    4. $ sudo make
    5. $ sudo cp ./googlemock/libgmock.a ./googlemock/gtest/libgtest.a /usr/lib/
    6. # less than 1.18.0
    7. $ cd /usr/src/gtest
    8. $ sudo cmake .
    9. $ sudo make
    10. $ sudo cp libgtest.a /usr/lib
    11. $ cd /usr/src/gmock
    12. $ sudo cmake .
    13. $ sudo make
    14. $ sudo cp libgmock.a /usr/lib
    1. 在 Pulsar 仓库中编译Pulsar C++ 客户端库。
    1. $ cd pulsar-client-cpp
    2. $ cmake .
    3. $ make

    安装组件成功后,文件 libpulsar.solibpulsar.a 位于资源库的 liblib 文件夹中。 工具 famerProducer and familConsumerperf 目录中。

    安装依赖项

    下载并安装 RPM 或 DEB后, libpulsar.so, libpulsarnosl.so, libpulsar.libpulsarwithdeps.a 库位于您的 /usr/lib 目录。

    By default, they are built in code path ${PULSAR_HOME}/pulsar-client-cpp. You can build with the command below.

    cmake . -DBUILD_TESTS=OFF -DLINK_STATIC=ON && make pulsarShared pulsarSharedNossl pulsarStatic pulsarStaticWithDeps -j 3.

    这些库依靠其他一些库。 如果您想要获得详细的依赖版本,请参阅 or DEB 文件。

    1. libpulsar.so 是一个共享的库,其中包含静态链接的 boostopenssl 它还动态地连接所有其他必要的库。 您可以使用此Pulsar库下面的命令。
    1. g++ --std=c++11 PulsarTest.cpp -o test /usr/lib/libpulsar.so -I/usr/local/ssl/include
    1. libpulsarnossl.so 是一个共享的库,类似于 libpulsar.so ,然而 opensslcrypto 库是动态链接的。 您可以使用此Pulsar库下面的命令。
    1. g++ --std=c++11 PulsarTest.cpp -o test /usr/lib/libpulsarnossl.so -lssl -lcrypto -I/usr/local/ssl/include -L/usr/local/ssl/lib
    1. libpulsar.a is a static library. You need to load dependencies before using this library. 您可以使用此Pulsar库下面的命令。
    1. g++ --std=c++11 PulsarTest.cpp -o test /usr/lib/libpulsar.a -lssl -lcrypto -ldl -lpthread -I/usr/local/ssl/include -L/usr/local/ssl/lib -lboost_system -lboost_regex -lcurl -lprotobuf -lzstd -lz
    1. libpulsarwithdep.a 是一个静态库,基于 libpulsar.a。 它被归档于 libnourt_regex, libnot_system, libcurl, libprotobuf, libzstdlibz. 您可以使用此Pulsar库下面的命令。
    1. g++ --std=c++11 PulsarTest.cpp -o test /usr/lib/libpulsarwithdeps.a -lssl -lcrypto -ldl -lpthread -I/usr/local/ssl/include -L/usr/local/ssl/lib

    libpulsarwithdeps. 不包括和 openssl 相关的 libssllibcrypto库, 因为这两个库与安全有关。 使用本地系统提供的版本来处理安全问题和升级库更加简单合理。

    Install RPM

    1. Download a RPM package from the links in the table.
    1. Install the package using the following command.
    1. $ rpm -ivh apache-pulsar-client*.rpm

    安装RPM成功后,Pulsar 库位于 /usr/lib 目录中。

    Install Debian

    1. Download a Debian package from the links in the table.
    1. Install the package using the following command.
    1. $ apt install ./apache-pulsar-client*.deb

    安装DEB成功后,Pulsar 库位于 /usr/lib 目录中。

    编译

    你可以通过下面的方式来编译包含libpulsar.so / libpulsar.a / libpulsarnossl.so / libpulsarwithdeps.a静态库以及所有必要依赖的二进制包。

    要构建C++库包,您需要先构建Java 包。

    1. mvn install -DskipTests

    RPM

    1. pulsar-client-cpp/pkg/rpm/docker-build-rpm.sh

    Debian

    To build Debian packages, enter the following command.

    编译好的DEB包会被保存在文件夹 pulsar-client-cpp/pkg/deb/BUILD/DEB/中.

    1. 克隆Pulsar项目仓库
    1. $ git clone https://github.com/apache/pulsar
    1. 安装所有必要的依赖项。
    1. # OpenSSL installation
    2. $ brew install openssl
    3. $ export OPENSSL_INCLUDE_DIR=/usr/local/opt/openssl/include/
    4. $ export OPENSSL_ROOT_DIR=/usr/local/opt/openssl/
    5. # Protocol Buffers installation
    6. $ brew install protobuf boost boost-python log4cxx
    7. # If you are using python3, you need to install boost-python3
    8. # Google Test installation
    9. $ git clone https://github.com/google/googletest.git
    10. $ cd googletest
    11. $ cmake .
    12. $ make install
    1. 然后在您克隆的repo中编译Pulsar客户端库:
    1. $ cd pulsar-client-cpp
    2. $ cmake .
    3. $ make

    安装 libpulsar

    Pulsar releases are available in the core repository. You can install the C++ client library with the following command. The package is installed with the library and headers.

    1. brew install libpulsar

    Windows (x64)

    编译

    1. 克隆Pulsar项目仓库
    1. $ git clone https://github.com/apache/pulsar
    1. 安装所有必要的依赖项。
    1. cd ${PULSAR_HOME}/pulsar-client-cpp
    1. 构建 C++ 库。
    1. cmake --build ./build --config Release
    1. 客户端库在以下位置。
    1. ${PULSAR_HOME}/pulsar-client-cpp/build/lib/Release/pulsar.lib
    2. ${PULSAR_HOME}/pulsar-client-cpp/build/lib/Release/pulsar.dll

    连接 URL

    要使用客户端库连接Pulsar,您需要指定一个 Pulsar 协议 URL。

    Pulsar protocol URLs are assigned to specific clusters, you can use the Pulsar URI scheme. The default port is 6650. The following is an example for localhost.

    1. pulsar://localhost:6650

    在一个Pulsar生产集群中,URL如下所示。

    1. pulsar://pulsar.us-west.example.com:6650

    If you use TLS authentication, you need to add ssl, and the default port is 6651. The following is an example.

    1. pulsar+ssl://pulsar.us-west.example.com:6651

    To use Pulsar as a consumer, you need to create a consumer on the C++ client. There are two main ways of using the consumer:

    • : synchronously calling receive(msg).
    • Non-blocking (event based) style: using a message listener.

    阻塞示例

    The benefit of this approach is that it is the simplest code. Simply keeps calling receive(msg) which blocks until a message is received.

    此示例在最早的偏移量开始订阅并消费100条消息。

    消息监听器

    你可以不运行循环阻塞方式,而选择调用基于事件的消息监听方式来接收每一条消息。

    此示例在最早的偏移量开始订阅并消费100条消息。

    1. #include <pulsar/Client.h>
    2. #include <atomic>
    3. #include <thread>
    4. using namespace pulsar;
    5. std::atomic<uint32_t> messagesReceived;
    6. void handleAckComplete(Result res) {
    7. std::cout << "Ack res: " << res << std::endl;
    8. }
    9. void listener(Consumer consumer, const Message& msg) {
    10. std::cout << "Got message " << msg << " with content '" << msg.getDataAsString() << "'" << std::endl;
    11. messagesReceived++;
    12. consumer.acknowledgeAsync(msg.getMessageId(), handleAckComplete);
    13. }
    14. int main() {
    15. Client client("pulsar://localhost:6650");
    16. Consumer consumer;
    17. ConsumerConfiguration config;
    18. config.setMessageListener(listener);
    19. config.setSubscriptionInitialPosition(InitialPositionEarliest);
    20. Result result = client.subscribe("persistent://public/default/my-topic", "consumer-1", config, consumer);
    21. if (result != ResultOk) {
    22. std::cout << "Failed to subscribe: " << result << std::endl;
    23. return -1;
    24. }
    25. // wait for 100 messages to be consumed
    26. while (messagesReceived < 100) {
    27. std::this_thread::sleep_for(std::chrono::milliseconds(100));
    28. }
    29. std::cout << "Finished consuming asynchronously!" << std::endl;
    30. client.close();
    31. return 0;
    32. }

    Create a producer

    To use Pulsar as a producer, you need to create a producer on the C++ client. There are two main ways of using a producer:

    • Blocking style : each call to send waits for an ack from the broker.
    • : sendAsync is called instead of send and a callback is supplied for when the ack is received from the broker.

    This example sends 100 messages using the blocking style. While simple, it does not produce high throughput as it waits for each ack to come back before sending the next message.

    1. #include <pulsar/Client.h>
    2. #include <thread>
    3. using namespace pulsar;
    4. int main() {
    5. Client client("pulsar://localhost:6650");
    6. Result result = client.createProducer("persistent://public/default/my-topic", producer);
    7. if (result != ResultOk) {
    8. std::cout << "Error creating producer: " << result << std::endl;
    9. return -1;
    10. }
    11. // Send 100 messages synchronously
    12. int ctr = 0;
    13. while (ctr < 100) {
    14. std::string content = "msg" + std::to_string(ctr);
    15. Message msg = MessageBuilder().setContent(content).setProperty("x", "1").build();
    16. Result result = producer.send(msg);
    17. if (result != ResultOk) {
    18. std::cout << "The message " << content << " could not be sent, received code: " << result << std::endl;
    19. } else {
    20. std::cout << "The message " << content << " sent successfully" << std::endl;
    21. }
    22. std::this_thread::sleep_for(std::chrono::milliseconds(100));
    23. ctr++;
    24. }
    25. std::cout << "Finished producing synchronously!" << std::endl;
    26. client.close();
    27. return 0;
    28. }

    非阻塞示例

    此示例使用非阻塞方式发送100条消息并使用 sendAsync 代替 send 。 这使得生产者可以并行发送多条消息,从而提高吞吐量。

    生产者配置blockIfQueueFull在这里很有用,可以避免传出发送请求的内部队列已满时出现ResultProducerQueueIsFull错误。 一旦内部队列已满, sendAsync 就会变成阻塞,使您的代码变得更加简单。

    1. #include <pulsar/Client.h>
    2. using namespace pulsar;
    3. std::atomic<uint32_t> acksReceived;
    4. void callback(Result code, const MessageId& msgId, std::string msgContent) {
    5. std::cout << "Received ack for msg: " << msgContent << " with code: "
    6. << code << " -- MsgID: " << msgId << std::endl;
    7. acksReceived++;
    8. }
    9. int main() {
    10. Client client("pulsar://localhost:6650");
    11. ProducerConfiguration producerConf;
    12. producerConf.setBlockIfQueueFull(true);
    13. Producer producer;
    14. Result result = client.createProducer("persistent://public/default/my-topic",
    15. producerConf, producer);
    16. if (result != ResultOk) {
    17. std::cout << "Error creating producer: " << result << std::endl;
    18. return -1;
    19. }
    20. // Send 100 messages asynchronously
    21. int ctr = 0;
    22. while (ctr < 100) {
    23. std::string content = "msg" + std::to_string(ctr);
    24. Message msg = MessageBuilder().setContent(content).setProperty("x", "1").build();
    25. producer.sendAsync(msg, std::bind(callback,
    26. std::placeholders::_1, std::placeholders::_2, content));
    27. std::this_thread::sleep_for(std::chrono::milliseconds(100));
    28. ctr++;
    29. }
    30. // wait for 100 messages to be acked
    31. while (acksReceived < 100) {
    32. std::this_thread::sleep_for(std::chrono::milliseconds(100));
    33. }
    34. std::cout << "Finished producing asynchronously!" << std::endl;
    35. client.close();
    36. return 0;
    37. }

    分区主题和懒惰生产者

    在扩展 Pulsar 主题时,您可以将主题配置成数百个分区。 同样,您也可以扩展您的生产者,从数百个到数千个生产者。 这可能会给 Pulsar brokers 带来一些压力,因为当你在分区主题上创建生产者时,内部会为每个分区创建一个内部生产者,这涉及到每个分区与 brokers 的通信。 因此,对于一个有1000个分区和1000个生产者的主题,它最终在生产者应用程序之间创建了1,000,000个内部生产者,每个生产者应用程序都必须与 broker 通信,以确定它应该连接到哪个 broker ,然后执行连接握手。

    You can reduce the load caused by this combination of a large number of partitions and many producers by doing the following:

    • use SinglePartition partition routing mode (this ensures that all messages are only sent to a single, randomly selected partition)
    • use non-keyed messages (when messages are keyed, routing is based on the hash of the key and so messages will end up being sent to multiple partitions)
    • use lazy producers (this ensures that an internal producer is only created on demand when a message needs to be routed to a partition)

    通过我们上面的例子,这将1000个生产者应用程序的内部生产者数量从1000000减少到了1000。

    Note that there can be extra latency for the first message sent. If you set a low send timeout, this timeout could be reached if the initial connection handshake is slow to complete.

    1. ProducerConfiguration producerConf;
    2. producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
    3. producerConf.setLazyStartPartitionedProducers(true);

    Enable authentication in connection URLs

    If you use TLS authentication when connecting to Pulsar, you need to add ssl in the connection URLs, and the default port is 6651. The following is an example.

    1. ClientConfiguration config = ClientConfiguration();
    2. config.setUseTls(true);
    3. config.setTlsTrustCertsFilePath("/path/to/cacert.pem");
    4. config.setTlsAllowInsecureConnection(false);
    5. config.setAuth(pulsar::AuthTls::create(
    6. "/path/to/client-cert.pem", "/path/to/client-key.pem"););
    7. Client client("pulsar+ssl://my-broker.com:6651", config);

    For complete examples, refer to .

    This section describes some examples about schema. For more information about schema, see Pulsar schema.

    Avro 模式

    • The following example shows how to create a producer with an Avro schema.

      1. static const std::string exampleSchema =
      2. "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
      3. "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
      4. Producer producer;
      5. ProducerConfiguration producerConf;
      6. producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
      7. client.createProducer("topic-avro", producerConf, producer);
    • The following example shows how to create a consumer with an Avro schema.

      1. static const std::string exampleSchema =
      2. "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
      3. "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
      4. ConsumerConfiguration consumerConf;
      5. Consumer consumer;
      6. consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
      7. client.subscribe("topic-avro", "sub-2", consumerConf, consumer)

    ProtobufNative 模式

    The following example shows how to create a producer and a consumer with a ProtobufNative schema. ​

    1. Generate the User class using Protobuf3.

    1. Include the ProtobufNativeSchema.h in your source code. Ensure the Protobuf dependency has been added to your project. ​

      1. #include <pulsar/ProtobufNativeSchema.h>

    1. Create a producer to send a User instance. ​

      1. ProducerConfiguration producerConf;
      2. producerConf.setSchema(createProtobufNativeSchema(User::GetDescriptor()));
      3. Producer producer;
      4. client.createProducer("topic-protobuf", producerConf, producer);
      5. User user;
      6. user.set_name("my-name");
      7. user.set_age(10);
      8. std::string content;
      9. user.SerializeToString(&content);
      10. producer.send(MessageBuilder().setContent(content).build());

      1. ConsumerConfiguration consumerConf;
      2. consumerConf.setSchema(createProtobufNativeSchema(User::GetDescriptor()));
      3. consumerConf.setSubscriptionInitialPosition(InitialPositionEarliest);
      4. Consumer consumer;
      5. client.subscribe("topic-protobuf", "my-sub", consumerConf, consumer);
      6. Message msg;
      7. consumer.receive(msg);
      8. User user2;