推荐使用单机模式的 Pulsar 进行开发,在本地开发环境中启用 WebSocket 服务。
在非单机模式下,有两种方法可以部署 WebSocket 服务:
- Pulsar Broker
- 作为一个独立的组件
在这种模式下,WebSocket 服务会使用已经在 broker 中运行的 HTTP 服务。 要启用此模式,需在安装目录下的 文件中设置 webSocketServiceEnabled
参数。
作为一个独立的组件
在这种模式下,WebSocket 会作为单独的服务在 Pulsar broker 上运行。 运行此模式,需在 文件中进行配置。 You’ll need to set at least the following parameters:
Here’s an example:
configurationStoreServers=zk1:2181,zk2:2181,zk3:2181
webServicePort=8080
clusterName=my-cluster
启动 Broker
配置完成后,你可以使用 pulsar-daemon
命令来启动服务:
$ bin/pulsar-daemon start websocket
Pulsar 的 WebSocket API 提供三个端点,用于、消费消息和。
所有通过 WebSocket API 的数据都使用 JSON 进行交互。
ws://broker-service-url:8080/ws/v2/producer/persistent/:tenant/:namespace/:topic
查询参数
发布消息
{
"payload": "SGVsbG8gV29ybGQ=",
"properties": {"key1": "value1", "key2": "value2"},
"context": "1"
}
Key | 类型 | 是否必需 | 说明 |
---|---|---|---|
payload | string | 是 | Base-64 编码的负载 |
properties | 键值对 | 否 | 应用程序定义的属性 |
context | string | 否 | 应用程序定义的请求标识符 |
key | string | 否 | 分区 topic 中使用的分区 |
replicationClusters | 数组 | 否 | 根据名称允许添加到集群列表的副本 |
响应成功示例
{
"result": "ok",
"messageId": "CAAQAw==",
"context": "1"
}
响应失败示例
{
"result": "send-error:3",
"errorMsg": "Failed to de-serialize from JSON",
"context": "1"
}
Key | 类型 | 是否必需 | 说明 |
---|---|---|---|
result | string | 是 | 发送成功则为 ok ,否则抛出异常 |
messageId | string | 是 | 已发布消息的 Message ID |
context | string | 否 | 应用程序定义的请求标识符 |
Consumer 端
Concumer 端要求在 URL 中指定租户、命名空间、topic 和订阅:
查询参数
注意:以上参数(pullMode
除外)适用于 WebSocket 服务的内部 consumer。 因此,即使客户端没有在 WebSocket 上消费,只要消息进入接收队列,就会受到传递设置的约束。
接收消息
Server will push messages on the WebSocket session:
"messageId": "CAAQAw==",
"payload": "SGVsbG8gV29ybGQ=",
"properties": {"key1": "value1", "key2": "value2"},
}
Key | 类型 | 是否必需 | 说明 |
---|---|---|---|
messageId | string | 是 | 消息 ID |
payload | string | 是 | Base-64 编码的负载 |
publishTime | string | 是 | 发布时间戳 |
properties | 键值对 | 否 | 应用程序定义的属性 |
key | string | 否 | Producer 设置的原始路由密钥 |
ACK 确认消息
Consumer needs to acknowledge the successful processing of the message to have the Pulsar broker delete it.
{
"messageId": "CAAQAw=="
}
Key | 类型 | 是否必需 | 说明 |
---|---|---|---|
messageId | string | 是 | 处理消息的消息ID |
Flow control
推送模式
默认情况下(pullMode=false
),consumer 端使用 receiverQueueSize
参数设置内部接收队列的大小,并限制传递到 WebSocket 客户端的未确认消息数。 在这种模式下,如果不发送消息确认,发送到 WebSocket 客户端的消息达到 receiverQueueSize
时,Pulsar WebSocket 将停止发送消息。
拉取模式
如果设置 pullMode
为 true
,则 WebSocket 客户端需要使用 permit
命令允许 Pulsar WebSocket 服务发送更多消息。
{
"type": "permit",
"permitMessages": 100
}
注意:在这种模式下,可以在不同的连接中确认消息。
Reader 端
The reader endpoint requires you to specify a tenant, namespace, and topic in the URL:
ws://broker-service-url:8080/ws/v2/reader/persistent/:tenant/:namespace/:topic
查询参数
Key | 类型 | 是否必需 | 说明 |
---|---|---|---|
readerName | string | 否 | Reader name |
receiverQueueSize | int | 否 | Consumer 接收队列的大小(默认:1000) |
messageId | int or enum | 否 | Message ID to start from, earliest or latest (default: latest ) |
接收消息
Server will push messages on the WebSocket session:
{
"messageId": "CAAQAw==",
"payload": "SGVsbG8gV29ybGQ=",
"properties": {"key1": "value1", "key2": "value2"},
"publishTime": "2016-08-30 16:45:57.785"
}
Key | 类型 | 是否必需 | 说明 |
---|---|---|---|
messageId | string | 是 | 消息 ID |
payload | string | 是 | Base-64 编码的负载 |
publishTime | string | 是 | 发布时间戳 |
properties | 键值对 | 否 | 应用程序定义的属性 |
key | string | 否 | Producer 设置的原始路由密钥 |
ACK 确认消息
{
"messageId": "CAAQAw=="
}
In case of error the server will close the WebSocket session using the following error codes:
Error Code | Error Message |
---|---|
1 | Failed to create producer |
2 | Failed to subscribe |
3 | Failed to deserialize from JSON |
4 | Failed to serialize to JSON |
5 | Failed to authenticate client |
6 | Client is not authorized |
7 | Invalid payload encoding |
8 | Unknown error |
应用程序负责在后台重新建立 WebSocket 连接。
Below you’ll find code examples for the Pulsar WebSocket API in Python and .
Python
This example uses the package. You can install it using pip:
You can also download it from .
Python producer
Here’s an example Python producer that sends a simple message to a Pulsar :
import websocket, base64, json
TOPIC = 'ws://localhost:8080/ws/v2/producer/persistent/public/default/my-topic'
ws = websocket.create_connection(TOPIC)
# Send one message as JSON
ws.send(json.dumps({
'payload' : base64.b64encode('Hello World'),
'properties': {
'key1' : 'value1',
'key2' : 'value2'
},
'context' : 5
}))
response = json.loads(ws.recv())
if response['result'] == 'ok':
print 'Message published successfully'
else:
print 'Failed to publish message:', response
Python consumer
Here’s an example Python consumer that listens on a Pulsar topic and prints the message ID whenever a message arrives:
import websocket, base64, json
TOPIC = 'ws://localhost:8080/ws/v2/consumer/persistent/public/default/my-topic/my-sub'
ws = websocket.create_connection(TOPIC)
while True:
msg = json.loads(ws.recv())
if not msg: break
print "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload']))
# Acknowledge successful processing
ws.send(json.dumps({'messageId' : msg['messageId']}))
ws.close()
Python reader
Here’s an example Python reader that listens on a Pulsar topic and prints the message ID whenever a message arrives:
import websocket, base64, json
TOPIC = 'ws://localhost:8080/ws/v2/reader/persistent/public/default/my-topic'
ws = websocket.create_connection(TOPIC)
while True:
msg = json.loads(ws.recv())
if not msg: break
print "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload']))
# Acknowledge successful processing
ws.send(json.dumps({'messageId' : msg['messageId']}))
ws.close()
Node.js
This example uses the package. You can install it using npm:
$ npm install ws
Node.js producer
var WebSocket = require('ws'),
topic = "ws://localhost:8080/ws/v2/producer/persistent/public/default/my-topic",
ws = new WebSocket(topic);
var message = {
"payload" : new Buffer("Hello World").toString('base64'),
"properties": {
"key1" : "value1",
"key2" : "value2"
},
"context" : "1"
};
ws.on('open', function() {
// Send one message
ws.send(JSON.stringify(message));
});
ws.on('message', function(message) {
console.log('received ack: %s', message);
});
Text
XPath: /pre[20]/code
Node.js consumer
Here’s an example Node.js consumer that listens on the same topic used by the producer above:
var WebSocket = require('ws'),
topic = "ws://localhost:8080/ws/v2/consumer/persistent/public/default/my-topic/my-sub",
ws = new WebSocket(topic);
ws.on('message', function(message) {
var receiveMsg = JSON.parse(message);
console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString());
var ackMsg = {"messageId" : receiveMsg.messageId};
ws.send(JSON.stringify(ackMsg));