底层实现上,目前官方版的Pulsar客户端支持对用户透明的连接重连、故障切换、未ack消息的缓冲、消息重传。
当应用程序要创建生产者/消费者时, Pulsar客户端库执行按以下两个步骤的工作:
- 客户端将尝试通过向服务器(Broker)发送 HTTP 查找请求,来确定主题(Topic)所在的服务器(Broker)。 客户端通过查询Zookeeper中(缓存)的元数据,来确定这条消息的topic在哪个broker上,如果该topic不在任何一个broker上,则把这个topic分配在负载最少的broker上。
- 当客户端获取了broker的地址之后,将会创建一个TCP连接(或复用连接池中的连接)并且进行鉴权。 客户端和broker通过该连接交换基于自定义协议的二进制命令。 同时,客户端会向broker发送一条命令用以在broker上创建生产者/消费者,该命令将会在验证授权策略后生效。
每当 TCP 连接中断时, 客户端将立即重新启动此安装阶段, 并将继续尝试使用指数退避重新建立生产者或使用者, 直到操作成功为止。
Reader 接口
The reader interface for Pulsar enables applications to manually manage cursors. When you use a reader to connect to a topic—-rather than a consumer—-you need to specify which message the reader begins reading from when it connects to a topic. 当连接到一个 topic 时,reader 接口支持的开始位置包括:
- The latest available message in the topic
- 如果你想开始的位置在最早和最新之间, 则需要显示的指定消息 ID。 你的应用程序将需要提前“知道”这个消息 ID,可能要从持久化存储或缓存中获取。
Pulsar的读取器接口在流计算场景下,对提供effective-once的语义很有帮助。 Pulsar能够将主题的消息进行重放,并从重放后的位置开始读取消息,是满足流处理的场景的重要基础。 Reader 接口为 Pulsar 客户端在 Topic 内提供了一种能“手动管理起始位置”的底层抽象。
Reader 接口内部是作为一个使用独占、非持久化订阅的被随机命名的一个消费者来实现的。
[ IMPORTANT ]
Please also note that a reader can have a “backlog”, but the metric is just to allow users to know how behind the reader is and is not considered for any backlog quota calculations.
Reader 接口目前无法在上使用。
下面是一个Java语言实现的从主题上最早可用消息的位置开始消费的例子
创建一个从其他位置(非最早可用且非最新可用消息处)读取消息的读取器