Architecture Overview

    单个 Pulsar 集群由以下三部分组成:

    • 一个或者多个 broker 负责处理和负载均衡 producer 发出的消息,并将这些消息分派给 consumer;Broker 与 Pulsar 配置存储交互来处理相应的任务,并将消息存储在 BookKeeper 实例中(又称 bookies);Broker 依赖 ZooKeeper 集群处理特定的任务,等等。
    • 包含一个或多个 bookie 的 BookKeeper 集群负责消息的。
    • 一个Zookeeper集群,用来处理多个Pulsar集群之间的协调任务。

    下图为一个 Pulsar 集群:

    在更细粒度的实例级别, 有一个能访问到全部实例的ZooKeeper群集处理涉及多个pulsar集群的配置协调任务, 例如 异地复制

    Pulsar的broker是一个无状态组件, 主要负责运行另外的两个组件:

    • An HTTP server that exposes a API for both administrative tasks and topic lookup for producers and consumers. The producers connect to the brokers to publish messages and the consumers connect to the brokers to consume the messages.
    • 一个调度分发器, 它是异步的TCP服务器,通过自定义 应用于所有相关的数据传输。

    Messages are typically dispatched out of a managed ledger cache for the sake of performance, unless the backlog exceeds the cache size. 如果积压的消息对于缓存来说太大了, 则Broker将开始从BookKeeper那里读取Entries(Entry同样是BookKeeper中的概念,相当于一条记录)。

    最后,为了支持全局Topic异地复制,Broker会控制Replicators追踪本地发布的条目,并把这些条目用重新发布到其他区域

    集群

    A Pulsar instance consists of one or more Pulsar clusters. Clusters, in turn, consist of:

    • 一个或者多个Pulsar
    • 一个ZooKeeper协调器,用于集群级别的配置和协调
    • 一组BookKeeper的Bookies用于消息的 持久化存储

    集群间可以通过进行消息同步

    如何管理Pulsar集群,请参考clusters指南

    Pulsar 元数据存储维护 Pulsar 集群的全部元数据,比如主题元数据、Schema、Broker 负载数据等等。 Pulsar 使用 进行元数据存储、集群配置和协调。 Pulsar 元数据存储可以部署在单独的 ZooKeeper 集群或者是部署在已有的 ZooKeeper 集群。 你可以将 ZooKeeper 用作 Pulsar 元数据存储和 BookKeeper 元数据存储。 如果想将部署的 Pulsar broker 连接到一个已有的 BookKeeper 集群,你需要部署单独的 ZooKeeper 集群分别用作 Pulsar 元数据存储和 BookKeeper 元数据存储。

    • 配置存储 Quorum 存储了租户、命名空间和其他需要全局一致的配置项。
    • 每个集群有自己独立的本地 ZooKeeper 保存集群内部配置和协调信息,例如 broker 负责哪几个主题及所有权归属元数据、broker 负载报告,BookKeeper ledger 元数据(这个是 BookKeeper 本身所依赖的)等等。

    配置存储(Configuration Store)

    配置存储维护所有 Pulsar 集群的配置信息,比如集群、租户、命名空间、分区主题相关的配置等等。 Pulsar 实例可能有一个本地集群、多个本地集群,或者多个跨区域集群。 因此,配置存储可以在 Pulsar 实例下跨多个集群共享配置。 配置存储可以部署在单独的 ZooKeeper 集群或者是部署在已有的 ZooKeeper 集群。

    Pulsar provides guaranteed message delivery for applications. If a message successfully reaches a Pulsar broker, it will be delivered to its intended target.

    为了提供这种保证,未确认送达的消息需要持久化存储直到它们被确认送达。 This mode of messaging is commonly called persistent messaging. 在Pulsar内部,所有消息都被保存并同步N份,例如,2个服务器保存四份,每个服务器上面都有镜像的RAID存储。

    Pulsar用 作为持久化存储。 BookKeeper是一个分布式的预写日志(WAL)系统,有如下几个特性特别适合Pulsar的应用场景:

    • 为按条目复制的顺序数据提供了非常高效的存储。
    • 保证了多系统挂掉时ledgers的读取一致性。
    • 提供不同的Bookies之间均匀的IO分布的特性。
    • It’s horizontally scalable in both capacity and throughput. Capacity can be immediately increased by adding more bookies to a cluster.
    • Bookies被设计成可以承载数千的并发读写的ledgers。 使用多个磁盘设备,一个用于日志,另一个用于一般存储,这样Bookies可以将读操作的影响和对于写操作的延迟分隔开。

    In addition to message data, cursors are also persistently stored in BookKeeper. Cursors是消费端消费的位置。 BookKeeper让Pulsar可以用一种可扩展的方式存储消费位置。

    At the moment, Pulsar supports persistent message storage. This accounts for the in all topic names. 下面是一个示例:

    下图展示了brokers和bookies是如何交互的

    Brokers和bookies

    Ledger是一个只追加的数据结构,并且只有一个写入器,这个写入器负责多个BookKeeper存储节点(就是Bookies)的写入。 Ledger的条目会被复制到多个bookies。 Ledgers本身有着非常简单的语义:

    • Pulsar Broker可以创建ledeger,添加内容到ledger和关闭ledger。
    • 当一个ledger被关闭后,除非明确的要写数据或者是因为写入器挂掉导致ledger关闭,这个ledger只会以只读模式打开。
    • 最后,当ledger中的条目不再有用的时候,整个legder可以被删除(ledger分布是跨Bookies的)。

    Ledger读一致性

    BookKeeper的主要优势在于他能在有系统故障时保证读的一致性。 由于Ledger只能被一个进程写入(之前提的写入器进程),这样这个进程在写入时不会有冲突,从而写入会非常高效。 在一次故障之后,ledger会启动一个恢复进程来确定ledger的最终状态并确认最后提交到日志的是哪一个条目。 在这之后,能保证所有的ledger读进程读取到相同的内容。

    Managed ledgers

    Given that Bookkeeper ledgers provide a single log abstraction, a library was developed on top of the ledger called the managed ledger that represents the storage layer for a single topic. managed ledger即消息流的抽象,有一个写入器进程不断在流结尾添加消息,并且有多个cursors 消费这个流,每个cursor有自己的消费位置。

    1. 在故障之后,原有的某个ledger不能再写了,需要创建一个新的。
    2. A ledger can be deleted when all cursors have consumed the messages it contains. This allows for periodic rollover of ledgers.

    In BookKeeper, journal files contain BookKeeper transaction logs. 在更新到 ledger之前,bookie需要确保描述这个更新的事务被写到持久(非易失)存储上面。 在bookie启动和旧的日志文件大小达到上限(由 参数配置)的时候,新的日志文件会被创建。

    Pulsar 代理

    Pulsar客户端和Pulsar集群交互的一种方式就是直连Pulsar 。 然而,在某些情况下,这种直连既不可行也不可取,因为客户端并不知道broker的地址。 例如在云环境或者 Kubernetes 以及其他类似的系统上面运行Pulsar,直连brokers就基本上不可能了。

    The Pulsar proxy provides a solution to this problem by acting as a single gateway for all of the brokers in a cluster. 如果你选择运行Pulsar Proxy(这是可选的),所有的客户端连接将会通过这个代理而不是直接与brokers通信。

    为了性能和容错,你可以运行任意个Pulsar proxy。

    架构上来看,Pulsar Proxy从ZooKeeper上面读取他所需要的所有信息。 当启动代理时,你只需要提供用于集群独有和实例范围的配置存储的ZooKeeper连接串。 下面是一个示例:

    关于Pulsar proxy有一些比较重要的注意点:

    • Connecting clients don’t need to provide any specific configuration to use the Pulsar proxy. 除了更新用于服务URL的IP之外,你不需要为现有的应用更新客户端配置(例如你在Pulsar proxy上层架设运行了负载均衡器)。

    需要能够使用单个 URL 与整个 Pulsar 实例进行通信。 Pulsar内部提供了服务发现的机制,你可以通过 配置Pulsar实例指南设置。

    你也可以用你自己的服务发现系统。 If you use your own system, there is just one requirement: when a client performs an HTTP request to an endpoint, such as , the client needs to be redirected to some active broker in the desired cluster, whether via DNS, an HTTP or IP redirect, or some other means.

    下面这张图展示了Pulsar服务发现机制:

    图中,Pulsar集群可以通过一个DNS名称寻址:。 例如,可以像这样访问这个Pulsar集群: