底层实现上,目前官方版的Pulsar客户端支持对用户透明的连接重连、故障切换、未ack消息的缓冲、消息重传。
当应用程序要创建生产者/消费者时, Pulsar客户端库执行按以下两个步骤的工作:
- 客户端将尝试通过向服务器(Broker)发送 HTTP 查找请求,来确定主题(Topic)所在的服务器(Broker)。 客户端通过查询Zookeeper中(缓存)的元数据,来确定这条消息的topic在哪个broker上,如果该topic不在任何一个broker上,则把这个topic分配在负载最少的broker上。
- 当客户端获取了broker的地址之后,将会创建一个TCP连接(或复用连接池中的连接)并且进行鉴权。 客户端和broker通过该连接交换基于自定义协议的二进制命令。 同时,客户端会向broker发送一条命令用以在broker上创建生产者/消费者,该命令将会在验证授权策略后生效。
Reader 接口
在Pulsar中, “标准” 消费者接口 涉及使用消费者监听 , 处理传入消息, 并在处理完这些消息后最终确认它们。 不论任何时候创建的一个新订阅,默认都会定位在 topic 的末尾位置,这意味着使用该订阅的消费者都只能接收在这之后新产生的消息。 如果消费者使用已经存在的订阅来连接 topic 时,它将从该订阅内最早的未确认消息开始读取。 总之,消费者接口是基于消息确认机制来自动管理订阅游标位置。
The reader interface for Pulsar enables applications to manually manage cursors. When you use a reader to connect to a topic—-rather than a consumer—-you need to specify which message the reader begins reading from when it connects to a topic. 当连接到一个 topic 时,reader 接口支持的开始位置包括:
- The latest available message in the topic
- 如果你想开始的位置在最早和最新之间, 则需要显示的指定消息 ID。 你的应用程序将需要提前“知道”这个消息 ID,可能要从持久化存储或缓存中获取。
Pulsar的读取器接口在流计算场景下,对提供的语义很有帮助。 Pulsar能够将主题的消息进行重放,并从重放后的位置开始读取消息,是满足流处理的场景的重要基础。 Reader 接口为 Pulsar 客户端在 Topic 内提供了一种能“手动管理起始位置”的底层抽象。
Reader 接口目前无法在有分区主题(partitioned topics)上使用。
下面是一个Java语言实现的从主题上最早可用消息的位置开始消费的例子
创建一个从其他位置(非最早可用且非最新可用消息处)读取消息的读取器