All the methods in producer, consumer, and reader of a C++ client are thread-safe.
Pulsar C++ client is supported on Linux and MacOS platforms.
-generated API docs for the C++ client are available here.
Linux
Four kind of libraries / libpulsarnossl.so
/ libpulsar.a
/ libpulsarwithdeps.a
are included in your /usr/lib
after rpm/deb download and install. By default, they are build under code path ${PULSAR_HOME}/pulsar-client-cpp
, using command cmake . -DBUILD_TESTS=OFF -DLINK_STATIC=ON && make pulsarShared pulsarSharedNossl pulsarStatic pulsarStaticWithDeps -j 3
These libraries rely on some other libraries, if you want to get detailed version of dependencies libraries, please reference these .
libpulsar.so
is the Shared library, it contains statically linkedboost
andopenssl
, and will also dynamically link all other needed libraries. The command the when use this pulsar library is like this:
libpulsarnossl.so
is the Shared library that similar tolibpulsar.so
except that the libraryopenssl
andcrypto
are dynamically linked. The command the when use this pulsar library is like this:
g++ --std=c++11 PulsarTest.cpp -o test /usr/lib/libpulsarnossl.so -lssl -lcrypto -I/usr/local/ssl/include -L/usr/local/ssl/lib
libpulsar.a
is the Static library, it need to load some dependencies library when using it. The command the when use this pulsar library is like this:
g++ --std=c++11 PulsarTest.cpp -o test /usr/lib/libpulsar.a -lssl -lcrypto -ldl -lpthread -I/usr/local/ssl/include -L/usr/local/ssl/lib -lboost_system -lboost_regex -lcurl -lprotobuf -lzstd -lz
libpulsarwithdeps.a
is the Static library, base onlibpulsar.a
, and archived in the dependencies libraries oflibboost_regex
,libboost_system
,libcurl
,libprotobuf
,libzstd
andlibz
, The command the when use this pulsar library is like this:
g++ --std=c++11 PulsarTest.cpp -o test /usr/lib/libpulsarwithdeps.a -lssl -lcrypto -ldl -lpthread -I/usr/local/ssl/include -L/usr/local/ssl/lib
libpulsarwithdeps.a
does not include library openssl related libraries: libssl
and libcrypto
, because these 2 library is related to security, by using user local system provided version is more reasonable, and more easy for user to handling security issue and library upgrade.
- Download a RPM package from the links in the table.
- Install the package using the following command.
$ rpm -ivh apache-pulsar-client*.rpm
After install, Pulsar libraries will be placed under /usr/lib
.
- Download a Debian package from the links in the table.
- Install the package using the following command:
$ apt install ./apache-pulsar-client*.deb
After install, Pulsar libraries will be placed under /usr/lib
.
There are recipes that build RPM and Debian packages containing a statically linked libpulsar.so
/ libpulsarnossl.so
/ libpulsar.a
/ libpulsarwithdeps.a
with all the required dependencies.
To build the C++ library packages, build the Java packages first.
RPM
pulsar-client-cpp/pkg/rpm/docker-build-rpm.sh
This builds the RPM inside a Docker container and it leaves the RPMs in .
Debian
To build Debian packages, enter the following command.
pulsar-client-cpp/pkg/deb/docker-build-deb.sh
Debian packages are created at pulsar-client-cpp/pkg/deb/BUILD/DEB/
.
Pulsar releases are available in the Homebrew core repository. You can install the C++ client library with the following command. The package is installed with the library and headers.
连接URL
To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
Pulsar protocol URLs are assigned to specific clusters, you can use the Pulsar URI scheme. The default port is 6650
. The following is an example for localhost.
pulsar://localhost:6650
pulsar://pulsar.us-west.example.com:6650
If you use TLS authentication, you need to add ssl
, and the default port is 6651
. The following is an example.
To connect to Pulsar as a consumer, you need to create a consumer on the C++ client. The following is an example.
Client client("pulsar://localhost:6650");
Consumer consumer;
Result result = client.subscribe("my-topic", "my-subscription-name", consumer);
if (result != ResultOk) {
LOG_ERROR("Failed to subscribe: " << result);
return -1;
}
Message msg;
while (true) {
consumer.receive(msg);
LOG_INFO("Received: " << msg
<< " with payload '" << msg.getDataAsString() << "'");
consumer.acknowledge(msg);
}
client.close();
Create a producer
To connect to Pulsar as a producer, you need to create a producer on the C++ client. The following is an example.
Producer producer;
Result result = client.createProducer("my-topic", producer);
if (result != ResultOk) {
LOG_ERROR("Error creating producer: " << result);
return -1;
// Publish 10 messages to the topic
for (int i = 0; i < 10; i++){
Message msg = MessageBuilder().setContent("my-message").build();
Result res = producer.send(msg);
LOG_INFO("Message sent: " << res);
}
client.close();
If you use TLS authentication when connecting to Pulsar, you need to add ssl
in the connection URLs, and the default port is 6651
. The following is an example.
ClientConfiguration config = ClientConfiguration();
config.setUseTls(true);
config.setTlsTrustCertsFilePath("/path/to/cacert.pem");
config.setTlsAllowInsecureConnection(false);
config.setAuth(pulsar::AuthTls::create(
"/path/to/client-cert.pem", "/path/to/client-key.pem"););
Client client("pulsar+ssl://my-broker.com:6651", config);
For complete examples, refer to .
Schema
This section describes some examples about schema. For more information about schema, see .
The following example shows how to create a producer with an Avro schema.
static const std::string exampleSchema =
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
Producer producer;
ProducerConfiguration producerConf;
producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
client.createProducer("topic-avro", producerConf, producer);
The following example shows how to create a consumer with an Avro schema.
static const std::string exampleSchema =
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
ConsumerConfiguration consumerConf;
Consumer consumer;