Pulsar binary protocol specification

Clients and brokers exchange commands with each other. Commands are formatted as binary (aka protobuf) messages. 在 PulsarApi.proto 中指定了 protobuf 命令的格式,并在下面的章节的 中记录。

与 Pulsar 协议相关的所有命令都包含在BaseCommand协议消息内,它包含了一个所有可能子命令的可选字段的 枚举 列表。 消息只能指定一个子命令。

因为 protobuf 不提供任何类型的消息帧,所以 Pulsar 协议的所有消息都是以指定帧大小的4字节字段为前缀。 单个帧的最大可用大小是5MB。

Pulsar 协议允许两个类型的命令:

  1. Simple commands that do not carry a message payload.
  2. Payload commands that bear a payload that is used when publishing or delivering messages. 在有效载荷命令中,protobuf 命令数据包含 protobuf 和原始消息内容,消息以原始格式进行传递。 所有消息都是以4字节无符号大枚举整数的形式进行传递。

为了提高效率,消息有效负载以原始格式而不是原buf格式传递。

简单命令

简单消息(无有效载荷)的基本结构如下:

有效载荷命令

有效载荷命令的基本结构如下:

Message metadata is stored alongside the application-specified payload as a serialized protobuf message. Metadata is created by the producer and passed on unchanged to the consumer.

当使用批量消息的时候,payload 字段将包含多个消息条目。每个消息条目都有对应的元数据,定义了SingleMessageMetadata 结构来表示单条消息的元数据。

对于单个批次,payload 字段的格式如下:

每条元数据的格式如下:

当启用压缩时,batch 消息只会被压缩一次。

建立连接

当建立到 broker 的 TCP 连接后,通常连接到 broker 的6650端口,客户端将负责初始化这个会话。

收到 broker 返回的 连接成功的信息后,客户端认为连接已经准备就绪。 或者,如果 broker 校验客户端权限没有通过,broker 将返回一个错误信息,同时关闭 TCP 连接。

例子:

Fields:

  • client_version → String based identifier. Format is not enforced
  • auth_method_name(optional) Name of the authentication plugin if auth enabled
  • auth_data(optional) Plugin specific authentication data
  • protocol_version → Indicates the protocol version supported by the client. Broker will not send commands introduced in newer revisions of the protocol. Broker 可能会强制选择一个最小的协议版本。
  1. message CommandConnected {
  2. "server_version" : "Pulsar-Broker-v1.15.2",
  3. "protocol_version" : 6
  4. }

Fields:

  • server_version → String identifier of broker version
  • protocol_version → Protocol version supported by the broker. Client must not attempt to send commands introduced in newer revisions of the protocol

保持连接

为了识别客户端和 broker 之间的网络分区情况,或者机器崩溃但是TCP连接却没有正常断开(断电,内核异常,机器重启等)。我们介绍一个探测远程对端服务的可用状态的策略。

客户端和 broker 之间会定期发送 Ping命令。如果超时时间(默认使用 broker 的配置60s)内没有返回Pong消息就会断开该 Socket 连接。

一个可用的 Pulsar 客户端不需要自己实现定时发送 Ping 探测的逻辑。但是它需要在收到 broker 的信息后立即回复,以防 broker 强制关闭 TCP 连接。

In order to send messages, a client needs to establish a producer. When creating a producer, the broker will first verify that this particular client is authorized to publish on the topic.

一旦生产者创建成功,通过使用建立连接过程中生成的生产者 ID,它将可以发布消息到 broker。

Producer interaction

生产命令
  1. "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
  2. "producer_id" : 1,
  3. "request_id" : 1
  4. }
  • topic → Complete topic name to where you want to create the producer on
  • producer_id → Client generated producer identifier. Needs to be unique within the same connection
  • request_id → Identifier for this request. Used to match the response with the originating request. Needs to be unique within the same connection
  • producer_name(optional) If a producer name is specified, the name will be used, otherwise the broker will generate a unique name. Generated producer name is guaranteed to be globally unique. Implementations are expected to let the broker generate a new producer name when the producer is initially created, then reuse it when recreating the producer after reconnections.

broker 将返回生产成功或者错误命令给客户端。

生产成功命令
  1. message CommandProducerSuccess {
  2. "request_id" : 1,
  3. "producer_name" : "generated-unique-producer-name"
  4. }

Parameters:

  • request_id → Original id of the CreateProducer request
  • producer_name → Generated globally unique producer name or the name specified by the client, if any.
发送命令

命令 Send 是给一个已经存在的生产者在上下文中发布新的消息的。 此命令用在消息帧中,此消息帧包含了该命令及消息的有效载荷。包含了完整的命令格式。

  1. message CommandSend {
  2. "producer_id" : 1,
  3. "sequence_id" : 0,
  4. "num_messages" : 1
  5. }

Parameters:

  • producer_id → id of an existing producer
  • → each message has an associated sequence id which is expected to be implemented with a counter starting at 0. The SendReceipt that acknowledges the effective publishing of a messages will refer to it by its sequence id.
  • num_messages(optional) Used when publishing a batch of messages at once.
发送消息返回的命令

当消息在配置的存储副本数中都存储完成后,broker 将给生产者发送确认消息。

Parameters:

  • producer_id → id of producer originating the send request
  • sequence_id → sequence id of the published message
  • message_id → message id assigned by the system to the published message Unique within a single cluster. Message id is composed of 2 longs, ledgerId and entryId, that reflect that this unique id is assigned when appending to a BookKeeper ledger
关闭生产者命令

Note: This command can be sent by either producer or broker.

当 broker 收到CloseProducer命令时,broker 将停止接收这个生产者的任何消息。并会一直等待,直到所有的消息被持久化存储并给客户端返回Success的消息后,才关闭连接。

当 broker 在演练故障转移(比如: broker重启或者正在通过负载均衡器卸载主题到其他的broker)时,Broker 也能发送CloseProducer 命令给客户端,通知关闭该连接。

When receiving the CloseProducer, the client is expected to go through the service discovery lookup again and recreate the producer again. The TCP connection is not affected.

消费者(Consumer)

消费者可以添加到订阅,并消费来自该订阅的消息。 每次重新连接后,客户端必须去订阅此主题。 如果订阅不存在,将创建一个新的订阅。

流量控制

After the consumer is ready, the client needs to give permission to the broker to push messages. This is done with the Flow command.

A Flow command gives additional permits to send messages to the consumer. 典型的使用者实现将在应用程序准备使用这些消息之前使用队列来累积这些消息。

应用程序将队列中一半的消息从队列中取出后,消费者会向 broker 发送许可请求更多消息(等于队列中一半的消息)。

For example, if the queue size is 1000 and the consumer consumes 500 messages in the queue. Then the consumer sends permits to the broker to ask for 500 messages.

订阅命令
  1. message CommandSubscribe {
  2. "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
  3. "subscription" : "my-subscription-name",
  4. "subType" : "Exclusive",
  5. "consumer_id" : 1,
  6. "request_id" : 1
  7. }

Parameters:

  • topic → Complete topic name to where you want to create the consumer on
  • subscription → Subscription name
  • subType → Subscription type: Exclusive, Shared, Failover, Key_Shared
  • consumer_id → Client generated consumer identifier. Needs to be unique within the same connection
  • request_id → Identifier for this request. Used to match the response with the originating request. Needs to be unique within the same connection
  • consumer_name(optional) Clients can specify a consumer name. This name can be used to track a particular consumer in the stats. Also, in Failover subscription type, the name is used to decide which consumer is elected as master (the one receiving messages): consumers are sorted by their consumer name and the first one is elected master.
流控命令
  1. message CommandFlow {
  2. "consumer_id" : 1,
  3. "messagePermits" : 1000
  4. }

Parameters:

  • consumer_id → Id of an already established consumer
  • messagePermits → Number of additional permits to grant to the broker for pushing more messages
消息命令

Command Message is used by the broker to push messages to an existing consumer, within the limits of the given permits.

This command is used in a frame that includes the message payload as well, for which the complete format is specified in the section.

  1. message CommandMessage {
  2. "consumer_id" : 1,
  3. "ledgerId" : 123,
  4. "entryId" : 456
  5. }
  6. }
确认命令

An Ack is used to signal to the broker that a given message has been successfully processed by the application and can be discarded by the broker.

In addition, the broker will also maintain the consumer position based on the acknowledged messages.

  1. message CommandAck {
  2. "consumer_id" : 1,
  3. "ack_type" : "Individual",
  4. "message_id" : {
  5. "ledgerId" : 123,
  6. "entryId" : 456
  7. }
  8. }

Parameters:

  • consumer_id → Id of an already established consumer
  • ack_type → Type of acknowledgment: or Cumulative
  • message_id → Id of the message to acknowledge
  • validation_error(optional) Indicates that the consumer has discarded the messages due to: UncompressedSizeCorruption, DecompressionError, ChecksumMismatch, BatchDeSerializeError
关闭消费者命令

Note: This command can be sent by either producer or broker.

This command behaves the same as CloseProducer

重新发送未确认消息命令

The protobuf object accepts a list of message ids that the consumer wants to be redelivered. If the list is empty, the broker will redeliver all the pending messages.

On redelivery, messages can be sent to the same consumer or, in the case of a shared subscription, spread across all available consumers.

终止消费主题命令

This is sent by a broker to a particular consumer, whenever the topic has been “terminated” and all the messages on the subscription were acknowledged.

The client should use this command to notify the application that no more messages are coming from the consumer.

获取消费统计数据命令

This command is sent by the client to retrieve Subscriber and Consumer level stats from the broker. Parameters:

  • request_id → Id of the request, used to correlate the request and the response.
  • consumer_id → Id of an already established consumer.
消费统计数据返回命令

This is the broker’s response to ConsumerStats request by the client. It contains the Subscriber and Consumer level stats of the consumer_id sent in the request. If the error_code or the error_message field is set it indicates that the request has failed.

解除订阅命令

This command is sent by the client to unsubscribe the consumer_id from the associated topic. Parameters:

  • request_id → Id of the request.
  • consumer_id → Id of an already established consumer which needs to unsubscribe.

搜索主题

Topic lookup needs to be performed each time a client needs to create or reconnect a producer or a consumer. Lookup is used to discover which particular broker is serving the topic we are about to use.

Lookup can be done with a REST call as described in the docs.

Since Pulsar-1.16 it is also possible to perform the lookup within the binary protocol.

For the sake of example, let’s assume we have a service discovery component running at pulsar://broker.example.com:6650

Individual brokers will be running at pulsar://broker-1.example.com:6650, pulsar://broker-2.example.com:6650, …

A client can use a connection to the discovery service host to issue a LookupTopic command. The response can either be a broker hostname to connect to, or a broker hostname to which retry the lookup.

The LookupTopic command has to be used in a connection that has already gone through the Connect / Connected initial handshake.

搜索主题

Fields:

  • topic → Topic name to lookup
  • request_id → Id of the request that will be passed with its response
  • authoritative → Initial lookup request should use false. When following a redirect response, client should pass the same value contained in the response
查找主题返回

Example of response with successful lookup:

  1. message CommandLookupTopicResponse {
  2. "request_id" : 1,
  3. "response" : "Connect",
  4. "brokerServiceUrl" : "pulsar://broker-1.example.com:6650",
  5. "brokerServiceUrlTls" : "pulsar+ssl://broker-1.example.com:6651",
  6. "authoritative" : true
  7. }

Example of lookup response with redirection:

  1. message CommandLookupTopicResponse {
  2. "request_id" : 1,
  3. "response" : "Redirect",
  4. "brokerServiceUrl" : "pulsar://broker-2.example.com:6650",
  5. "brokerServiceUrlTls" : "pulsar+ssl://broker-2.example.com:6651",
  6. "authoritative" : true
  7. }

In this second case, we need to reissue the LookupTopic command request to broker-2.example.com and this broker will be able to give a definitive answer to the lookup request.

Partitioned topics metadata discovery is used to find out if a topic is a “partitioned topic” and how many partitions were set up.

If the topic is marked as “partitioned”, the client is expected to create multiple producers or consumers, one for each partition, using the partition-X suffix.

This information only needs to be retrieved the first time a producer or consumer is created. There is no need to do this after reconnections.

The discovery of partitioned topics metadata works very similar to the topic lookup. The client send a request to the service discovery address and the response will contain actual metadata.

获取分区主题元数据命令
  1. message CommandPartitionedTopicMetadata {
  2. "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
  3. "request_id" : 1
  4. }

Fields:

  • topic → the topic for which to check the partitions metadata
  • request_id → Id of the request that will be passed with its response
获取分区元数据返回命令

Example of response with metadata:

  1. message CommandPartitionedTopicMetadataResponse {
  2. "request_id" : 1,
  3. "response" : "Success",
  4. }