Node.js 客户端中 、consumer 和 中的所有方法都对线程可用。
You can install the library via .
Pulsar Node.js client library is based on the C++ client library. Follow these instructions and install the Pulsar C++ client library.
兼容性
Compatibility between each version of the Node.js client and the C++ client is as follows:
If an incompatible version of the C++ client is installed, you may fail to build or run this library.
使用 npm 安装
Install the pulsar-client
library via :
连接URL
To connect to Pulsar using client libraries, you need to specify a URL.
Pulsar protocol URLs are assigned to specific clusters, use the pulsar
scheme and have a default port of 6650. Here is an example for localhost
:
pulsar://localhost:6650
A URL for a production Pulsar cluster may look something like this:
pulsar://pulsar.us-west.example.com:6650
If you are using TLS encryption or , the URL will look like something like this:
pulsar+ssl://pulsar.us-west.example.com:6651
Here is an example:
客户端配置
The following configurable parameters are available for Pulsar clients:
Parameter | Description | 默认值 |
---|---|---|
serviceUrl | The connection URL for the Pulsar cluster. 更多详细信息,参阅。 | |
authentication | Configure the authentication provider. (default: no authentication). 更多详细信息,参阅 TLS 认证。 | |
operationTimeoutSeconds | Node.js 客户端操作的超时时间(创建 producer、订阅/取消订阅 )。 Retries will occur until this threshold is reached, at which point the operation will fail. | 30 |
ioThreads | 用于处理 Pulsar broker 连接的线程数。 | 1 |
messageListenerThreads | 和 readers 监听消息使用的线程数。 | 1 |
concurrentLookupRequest | The number of concurrent lookup requests that can be sent on each broker connection. Setting a maximum helps to keep from overloading brokers. 只有当客户端需要生产及/或订阅数千 Pulsar topic 时,才需要修改预设的默认值:50000 。 | 50000 |
tlsTrustCertsFilePath | 信任的 TLS 证书的文件路径。 | |
tlsValidateHostname | 设置是否启用 TLS 主机名验证。 | false |
tlsAllowInsecureConnection | 设置是否让 Pulsar 客户端从 broker 接受不信任的 TLS 证书。 | false |
statsIntervalInSeconds | 两次信息统计之间的时间间隔。 当 statsInterval 为正数时启用信息统计。 最小值为 1 秒。 | 600 |
Producers
Pulsar producers publish messages to Pulsar topics. You can configure Node.js producers using a producer configuration object.
Here is an example:
const producer = await client.createProducer({
topic: 'my-topic',
});
await producer.send({
data: Buffer.from("Hello, Pulsar"),
});
await producer.close();
Producer operations
Pulsar Node.js producers have the following methods available:
Method | Description | Return type |
---|---|---|
send(Object) | Publishes a message to the producer’s topic. 当 Pulsar broker 成功确认消息,或是准备抛出异常时,Promise 对象会运行 executor 函数。 | Promise<null> |
flush() | 从发送队列发送消息给 Pulsar broker。 当 Pulsar broker 成功确认消息,或是准备抛出异常时,Promise 对象会运行 executor 函数。 | Promise<null> |
close() | Closes the producer and releases all resources allocated to it. 如果调用 close() ,则不再接收发布者的消息。 此方法将返回 Promise 对象,并在 Pulsar 持久化所有待发布请求时运行 executor 函数。 If an error is thrown, no pending writes will be retried. | Promise<null> |
Parameter | Description | 默认值 |
---|---|---|
topic | Producer 发布消息的目标 Pulsar 。 | |
producerName | A name for the producer. 如果没有指定名称,Pulsar 将自动生成全局唯一的名称。 如果指定名称,则指定的名称必须在所有 Pulsar 集群中唯一,否则创建操作将会抛出异常。 | |
sendTimeoutMs | When publishing a message to a topic, the producer will wait for an acknowledgment from the responsible Pulsar broker. If a message is not acknowledged within the threshold set by this parameter, an error will be thrown. 如果将 sendTimeoutMs 设为 -1,那么超时时间将为无穷大(因而移除超时限定)。 Removing the send timeout is recommended when using Pulsar’s feature. | 30000 |
initialSequenceId | 消息的初始序列号 Producer 在发送消息时,将序列号添加到消息中。 只要发送消息,序列号就会相应增加。 | |
maxPendingMessages | The maximum size of the queue holding pending messages (i.e. messages waiting to receive an acknowledgment from the broker). 默认情况下,当队列已满时,不能继续调用 send 方法,除非将 blockIfQueueFull 设置为 true 。 | 1000 |
maxPendingMessagesAcrossPartitions | 所有分区等待队列空间总和的最大值。 | 50000 |
blockIfQueueFull | 如果设置为 true ,当消息传出队列已满时,producer 的 send 方法会进行等待,而不是抛出异常(该队列大小由 maxPendingMessages 参数决定);如果设置为 false (默认值),当队列已满时,不能调用 send ,并抛出异常。 | false |
messageRoutingMode | 消息路由逻辑( 上的 producer 使用)。 This logic is applied only when no key is set on messages. 可以设置为循环(RoundRobinDistribution ),或是发布所有消息到单个分区 (UseSinglePartition ,默认值)。 | UseSinglePartition |
hashingScheme | The hashing function that determines the partition on which a particular message is published (partitioned topics only). 可以设置为 JavaStringHash (等同于 Java 中的 String.hashCode() )、Murmur3_32Hash (应用了 Murmur3 散列函数)、或 BoostHash (应用了 C++ 中 库的散列函数)。 | BoostHash |
compressionType | The message data compression type used by the producer. 可以设置为 LZ4 和 。 | Compression None |
batchingEnabled | 如果设置为 true ,producer 将批量发送消息。 | true |
batchingMaxPublishDelayMs | 批量发送消息的最大延迟。 | 10 |
batchingMaxMessages | 批量发送消息时,消息大小的最大值。 | 1000 |
properties | Producer 的元数据。 |
生产者示例
This example creates a Node.js producer for the my-topic
topic and sends 10 messages to that topic:
const Pulsar = require('pulsar-client');
(async () => {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
// Create a producer
const producer = await client.createProducer({
topic: 'my-topic',
});
// Send messages
for (let i = 0; i < 10; i += 1) {
const msg = `my-message-${i}`;
producer.send({
data: Buffer.from(msg),
});
console.log(`Sent message: ${msg}`);
}
await producer.flush();
await producer.close();
await client.close();
})();
Pulsar consumers subscribe to one or more Pulsar topics and listen for incoming messages produced on that topic/those topics. You can Node.js consumers using a consumer configuration object.
Here is an example:
const consumer = await client.subscribe({
topic: 'my-topic',
subscription: 'my-subscription',
});
const msg = await consumer.receive();
console.log(msg.getData().toString());
consumer.acknowledge(msg);
await consumer.close();
Consumer operations
Pulsar Node.js consumers have the following methods available:
Consumer configuration
Parameter | Description | 默认值 |
---|---|---|
topic | Consumer 创建订阅并监听消息的目标 Pulsar topic。 | |
subscription | Consumer 的订阅名称。 | |
subscriptionType | 可以设置为独占 、共享 或灾备 。 | Exclusive |
ackTimeoutMs | 消息确认的超时时间(以毫秒为单位)。 | 0 |
receiverQueueSize | 设置 consumer 接收队列的大小,即在应用程序调用 receive 之前允许堆积的消息数。 A value higher than the default of 1000 could increase consumer throughput, though at the expense of more memory utilization. | 1000 |
receiverQueueSizeAcrossPartitions | Set the max total receiver queue size across partitions. 当接收队列总数超过此值时,将减小每个分区接收队列的大小。 | 50000 |
consumerName | Consumer 的名称。 当前(v2.4.1)版本中, 模式按照 consumer 名称排序。 | |
properties | Consumer 的元数据。 |
消费者示例
Readers
Pulsar readers process messages from Pulsar topics. Readers are different from consumers because with readers you need to explicitly specify which message in the stream you want to begin with (consumers, on the other hand, automatically begin with the most recently unacked message). You can configure Node.js readers using a reader configuration object.
Here is an example:
const reader = await client.createReader({
topic: 'my-topic',
startMessageId: Pulsar.MessageId.earliest(),
});
const msg = await reader.readNext();
console.log(msg.getData().toString());
await reader.close();
Pulsar Node.js readers have the following methods available:
Method | Description | Return type |
---|---|---|
readNext() | 接收 topic 中的下一条消息(类似于 consumer 中的 receive 方法)。 当消息可用时,Promise 对象运行 executor 函数并获取消息对象。 | Promise<Object> |
readNext(Number) | 在指定的超时时间(以毫秒为单位)内从 topic 接收单条消息。 | Promise<Object> |
hasNext() | 返回 broker 是否在目标 topic 中存有下一条消息。 | |
close() | 关闭 reader,使 reader 不再从 broker 接收消息。 | Promise<null> |
Reader configuration
Parameter | Description | 默认值 |
---|---|---|
topic | Reader 创建订阅并监听消息的目标 Pulsar topic。 | |
startMessageId | Reader 的初始位置,即 reader 处理的第一条消息的位置。 The options are Pulsar.MessageId.earliest (the earliest available message on the topic), Pulsar.MessageId.latest (the latest available message on the topic), or a message ID object for a position that is not earliest or latest. | |
receiverQueueSize | 设置 reader 接收队列的大小,即在应用程序调用 readNext 之前允许堆积的消息数。 A value higher than the default of 1000 could increase reader throughput, though at the expense of more memory utilization. | 1000 |
readerName | Reader 的名称。 | |
subscriptionRolePrefix | The subscription role prefix. |
Reader 示例
This example creates a Node.js reader with the my-topic
topic, reads messages, and prints the content that arrive for 10 times:
const Pulsar = require('pulsar-client');
(async () => {
// Create a client
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
operationTimeoutSeconds: 30,
});
// Create a reader
const reader = await client.createReader({
topic: 'my-topic',
startMessageId: Pulsar.MessageId.earliest(),
});
// read messages
for (let i = 0; i < 10; i += 1) {
const msg = await reader.readNext();
console.log(msg.getData().toString());
}
await reader.close();
await client.close();
})();
In Pulsar Node.js client, you have to construct producer message object for producer.
Here is an example message:
const msg = {
data: Buffer.from('Hello, Pulsar'),
partitionKey: 'key1',
properties: {
'foo': 'bar',
},
eventTimestamp: Date.now(),
replicationClusters: [
'cluster1',
'cluster2',
],
}
await producer.send(msg);
The following keys are available for producer message objects:
消息对象操作
In Pulsar Node.js client, you can receive (or read) message object as consumer (or reader).
The message object have the following methods available:
Method | Description | Return type |
---|---|---|
getTopicName() | Topic 名称的 Getter 方法。 | String |
getProperties() | 属性的 Getter 方法。 | 数组<Object> |
getData() | 消息数据的 Getter方法。 | Buffer |
getMessageId() | 的 Getter 方法。 | 对象 |
getPublishTimestamp() | 发布时间戳的 Getter 方法。 | 数值 |
getEventTimestamp() | 事件时间戳的 Getter 方法。 | 数值 |
getPartitionKey() | 分区键的 Getter 方法。 | String |
消息 ID 对象操作
In Pulsar Node.js client, you can get message id object from message object.
The message id object have the following methods available:
Method | Description | Return type |
---|---|---|
serialize() | 序列化消息 id 到缓存中进行存储。 | Buffer |
toString() | 获取消息 id 字符串。 | String |
The following static methods are available for the message id object:
Method | Description | Return type |
---|---|---|
earliest() | MessageId 表示存储在 topic 中最早/最旧的可用消息。 | 对象 |
latest() | MessageId 表示存储在 topic 中最晚/最新的可用消息。 | 对象 |
从缓存中反序列化出消息 id。 | 对象 |